summaryrefslogtreecommitdiff
path: root/lib/AsynchronousCommandProcessor.cs
blob: c0b8164260cf492caa604658bb21aeca37076cbe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
using System;
using System.Collections.Generic;
using System.Threading;
using jive;

namespace jive
{
  public class AsynchronousCommandProcessor : CommandProcessor
  {
    readonly Queue<Command> queued_commands;
    readonly EventWaitHandle manual_reset;
    readonly IList<Thread> worker_threads;
    bool keep_working;

    static public readonly Command Empty = new EmptyCommand();

    public AsynchronousCommandProcessor()
    {
      queued_commands = new Queue<Command>();
      worker_threads = new List<Thread>();
      manual_reset = new ManualResetEvent(false);
    }

    public void add(Action command)
    {
      add(new AnonymousCommand(command));
    }

    public void add(Command command_to_process)
    {
      lock (queued_commands)
      {
        if (queued_commands.Contains(command_to_process)) return;
        queued_commands.Enqueue(command_to_process);
        reset_thread();
      }
    }

    public void run()
    {
      reset_thread();
      keep_working = true;
      var worker_thread = new Thread(run_commands);
      worker_thread.SetApartmentState(ApartmentState.STA);
      worker_threads.Add(worker_thread);
      worker_thread.Start();
    }

    public void stop()
    {
      keep_working = false;
      manual_reset.Set();
      //manual_reset.Close();
    }

    [STAThread]
    void run_commands()
    {
      while (keep_working)
      {
        manual_reset.WaitOne();
        run_next_command();
      }
    }

    void run_next_command()
    {
      var command = Empty;
      within_lock(() =>
          {
          if (queued_commands.Count == 0)
          manual_reset.Reset();
          else
          command = queued_commands.Dequeue();
          });
      safely_invoke(() =>
          {
          command.run();
          });
      reset_thread();
    }

    void safely_invoke(Action action)
    {
      try
      {
        action();
      }
      catch (Exception e)
      {
        this.log().error(e);
      }
    }

    void reset_thread()
    {
      within_lock(() =>
          {
          if (queued_commands.Count > 0) manual_reset.Set();
          else manual_reset.Reset();
          });
    }

    void within_lock(Action action)
    {
      lock (queued_commands)
      {
        action();
      }
    }
  }
}