summaryrefslogtreecommitdiff
path: root/product/service.infrastructure/threading/AsynchronousCommandProcessor.cs
blob: 91bd8355e6b85e14db2087d9eb6208129e3d71f5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading;
using gorilla.commons.utility;

namespace MoMoney.Service.Infrastructure.Threading
{
    public class AsynchronousCommandProcessor : CommandProcessor
    {
        readonly Queue<Command> queued_commands;
        readonly EventWaitHandle manual_reset;
        readonly IList<Thread> worker_threads;
        bool keep_working;

        public AsynchronousCommandProcessor()
        {
            queued_commands = new Queue<Command>();
            worker_threads = new List<Thread>();
            manual_reset = new ManualResetEvent(false);
        }

        public void add(Expression<Action> action_to_process)
        {
            add(new AnonymousCommand(action_to_process));
        }

        public void add(Command command_to_process)
        {
            lock (queued_commands)
            {
                if (queued_commands.Contains(command_to_process)) return;
                queued_commands.Enqueue(command_to_process);
                reset_thread();
            }
        }

        public void run()
        {
            reset_thread();
            keep_working = true;
            var worker_thread = new Thread(run_commands);
            worker_thread.SetApartmentState(ApartmentState.STA);
            worker_threads.Add(worker_thread);
            worker_thread.Start();
        }

        public void stop()
        {
            keep_working = false;
            manual_reset.Set();
            //manual_reset.Close();
        }

        [STAThread]
        void run_commands()
        {
            while (keep_working)
            {
                manual_reset.WaitOne();
                run_next_command();
            }
        }

        void run_next_command()
        {
            Command command;
            lock (queued_commands)
            {
                if (queued_commands.Count == 0)
                {
                    manual_reset.Reset();
                    return;
                }
                command = queued_commands.Dequeue();
            }
            command.run();
            reset_thread();
        }

        void reset_thread()
        {
            lock (queued_commands)
            {
                if (queued_commands.Count > 0) manual_reset.Set();
                else manual_reset.Reset();
            }
        }
    }
}