summaryrefslogtreecommitdiff
path: root/code/common/AsynchronousCommandProcessor.cs
blob: b84629f2c070e4439ce9cb32b3a8bb75e84d7b08 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using System;
using System.Collections.Generic;
using System.Threading;

namespace common
{
    public class AsynchronousCommandProcessor : CommandProcessor
    {
        readonly Queue<Command> queued_commands;
        readonly EventWaitHandle manual_reset;
        readonly IList<Thread> worker_threads;
        bool keep_working;

        static readonly Command Empty = new EmptyCommand();

        public AsynchronousCommandProcessor()
        {
            queued_commands = new Queue<Command>();
            worker_threads = new List<Thread>();
            manual_reset = new ManualResetEvent(false);
        }

        public void add(Command command_to_process)
        {
            lock (queued_commands)
            {
                if (queued_commands.Contains(command_to_process)) return;
                queued_commands.Enqueue(command_to_process);
                reset_thread();
            }
        }

        public void run()
        {
            reset_thread();
            keep_working = true;
            var worker_thread = new Thread(run_commands);
            worker_thread.SetApartmentState(ApartmentState.STA);
            worker_threads.Add(worker_thread);
            worker_thread.Start();
        }

        public void stop()
        {
            keep_working = false;
            manual_reset.Set();
            //manual_reset.Close();
        }

        [STAThread]
        void run_commands()
        {
            while (keep_working)
            {
                manual_reset.WaitOne();
                run_next_command();
            }
        }

        void run_next_command()
        {
            var command = Empty;
            within_lock(() =>
            {
                if (queued_commands.Count == 0)
                    manual_reset.Reset();
                else
                    command = queued_commands.Dequeue();
            });
            safely_invoke(() =>
            {
                command.run();
            });
            reset_thread();
        }

        void safely_invoke(Action action)
        {
            try
            {
                action();
            }
            catch (Exception e)
            {
                e.add_to_log();
            }
        }

        void reset_thread()
        {
            within_lock(() =>
            {
                if (queued_commands.Count > 0) manual_reset.Set();
                else manual_reset.Reset();
            });
        }

        void within_lock(Action action)
        {
            lock (queued_commands)
            {
                action();
            }
        }
    }
}