Scott Hiett commited on
Commit
31b770c
2 Parent(s): 4ef9714 abcde98

Merge pull request #1 from hiett/sh/transaction-support

Browse files
config/config.exs CHANGED
@@ -1,8 +1,8 @@
1
  import Config
2
 
3
  config :srh,
4
- mode: "file",
5
- file_path: "srh-config/tokens.json",
6
- port: 8080
7
 
8
  import_config "#{config_env()}.exs"
 
1
  import Config
2
 
3
  config :srh,
4
+ mode: "file",
5
+ file_path: "srh-config/tokens.json",
6
+ port: 8080
7
 
8
  import_config "#{config_env()}.exs"
config/prod.exs CHANGED
@@ -1,4 +1,4 @@
1
  import Config
2
 
3
  config :srh,
4
- port: 80
 
1
  import Config
2
 
3
  config :srh,
4
+ port: 80
config/runtime.exs CHANGED
@@ -1,6 +1,6 @@
1
  import Config
2
 
3
  config :srh,
4
- mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
5
- file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
6
- port: Integer.parse(System.get_env("PORT") || "8080")
 
1
  import Config
2
 
3
  config :srh,
4
+ mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
5
+ file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
6
+ port: Integer.parse(System.get_env("PORT") || "8080")
lib/srh.ex CHANGED
@@ -5,7 +5,7 @@ defmodule Srh do
5
 
6
  def start(_type, _args) do
7
  IO.puts("Using port #{@port}")
8
-
9
  children = [
10
  Srh.Auth.TokenResolver,
11
  {GenRegistry, worker_module: Srh.Redis.Client},
 
5
 
6
  def start(_type, _args) do
7
  IO.puts("Using port #{@port}")
8
+
9
  children = [
10
  Srh.Auth.TokenResolver,
11
  {GenRegistry, worker_module: Srh.Redis.Client},
lib/srh/http/base_router.ex CHANGED
@@ -23,6 +23,12 @@ defmodule Srh.Http.BaseRouter do
23
  |> handle_response(conn)
24
  end
25
 
 
 
 
 
 
 
26
  match _ do
27
  send_resp(conn, 404, "Endpoint not found")
28
  end
@@ -51,6 +57,9 @@ defmodule Srh.Http.BaseRouter do
51
  {:malformed_data, message} ->
52
  %{code: 400, message: message, json: false}
53
 
 
 
 
54
  {:not_authorized, message} ->
55
  %{code: 401, message: message, json: false}
56
 
 
23
  |> handle_response(conn)
24
  end
25
 
26
+ post "/multi-exec" do
27
+ conn
28
+ |> handle_extract_auth(&CommandHandler.handle_command_transaction_array(conn, &1))
29
+ |> handle_response(conn)
30
+ end
31
+
32
  match _ do
33
  send_resp(conn, 404, "Endpoint not found")
34
  end
 
57
  {:malformed_data, message} ->
58
  %{code: 400, message: message, json: false}
59
 
60
+ {:redis_error, data} ->
61
+ %{code: 400, message: Jason.encode!(data), json: true}
62
+
63
  {:not_authorized, message} ->
64
  %{code: 401, message: message, json: false}
65
 
lib/srh/http/command_handler.ex CHANGED
@@ -24,6 +24,17 @@ defmodule Srh.Http.CommandHandler do
24
  end
25
  end
26
 
 
 
 
 
 
 
 
 
 
 
 
27
  defp do_handle_command(command_array, token) do
28
  case TokenResolver.resolve(token) do
29
  {:ok, connection_info} ->
@@ -44,6 +55,16 @@ defmodule Srh.Http.CommandHandler do
44
  end
45
  end
46
 
 
 
 
 
 
 
 
 
 
 
47
  defp dispatch_command_array(_arr, _connection_info, responses \\ [])
48
 
49
  defp dispatch_command_array([current | rest], connection_info, responses) do
@@ -52,9 +73,8 @@ defmodule Srh.Http.CommandHandler do
52
  {:ok, result_map} ->
53
  [result_map | responses]
54
 
55
- {:malformed_data, result_json} ->
56
- # TODO: change up the chain to json this at the last moment, so this isn't here
57
- [Jason.decode!(result_json) | responses]
58
  end
59
 
60
  dispatch_command_array(rest, connection_info, updated_responses)
@@ -65,6 +85,61 @@ defmodule Srh.Http.CommandHandler do
65
  {:ok, Enum.reverse(responses)}
66
  end
67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  defp dispatch_command(
69
  command_array,
70
  %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
@@ -80,10 +155,10 @@ defmodule Srh.Http.CommandHandler do
80
 
81
  {:error, error} ->
82
  {
83
- :malformed_data,
84
- Jason.encode!(%{
85
  error: error.message
86
- })
87
  }
88
  end
89
 
 
24
  end
25
  end
26
 
27
+ def handle_command_transaction_array(conn, token) do
28
+ # Transactions use the same body format as pipelines, so we can use the same validator
29
+ case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
30
+ {:ok, array_of_command_arrays} ->
31
+ do_handle_command_transaction_array(array_of_command_arrays, token)
32
+
33
+ {:error, error_message} ->
34
+ {:malformed_data, error_message}
35
+ end
36
+ end
37
+
38
  defp do_handle_command(command_array, token) do
39
  case TokenResolver.resolve(token) do
40
  {:ok, connection_info} ->
 
55
  end
56
  end
57
 
58
+ defp do_handle_command_transaction_array(array_of_command_arrays, token) do
59
+ case TokenResolver.resolve(token) do
60
+ {:ok, connection_info} ->
61
+ dispatch_command_transaction_array(array_of_command_arrays, connection_info)
62
+
63
+ {:error, msg} ->
64
+ {:not_authorized, msg}
65
+ end
66
+ end
67
+
68
  defp dispatch_command_array(_arr, _connection_info, responses \\ [])
69
 
70
  defp dispatch_command_array([current | rest], connection_info, responses) do
 
73
  {:ok, result_map} ->
74
  [result_map | responses]
75
 
76
+ {:redis_error, result} ->
77
+ [result | responses]
 
78
  end
79
 
80
  dispatch_command_array(rest, connection_info, updated_responses)
 
85
  {:ok, Enum.reverse(responses)}
86
  end
87
 
88
+ defp dispatch_command_transaction_array(
89
+ command_array,
90
+ %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info,
91
+ responses \\ []
92
+ ) do
93
+ case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
94
+ {:ok, client_pid} ->
95
+ # Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
96
+ worker_pid = Client.borrow_worker(client_pid)
97
+
98
+ wrapped_command_array = [["MULTI"] | command_array]
99
+ do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
100
+
101
+ # Now manually run the EXEC - this is what contains the information to form the response, not the above
102
+ result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
103
+ {:ok, res} ->
104
+ {
105
+ :ok,
106
+ res
107
+ |> Enum.map(&(%{result: &1}))
108
+ }
109
+ # TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?
110
+
111
+ {:error, error} ->
112
+ {:redis_error, %{error: error.message}}
113
+ end
114
+
115
+ Client.return_worker(client_pid, worker_pid)
116
+
117
+ result
118
+ {:error, msg} ->
119
+ {:server_error, msg}
120
+ end
121
+ end
122
+
123
+ defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
124
+ updated_responses = case ClientWorker.redis_command(worker_pid, current) do
125
+ {:ok, res} ->
126
+ [%{result: res} | responses]
127
+
128
+ {:error, error} ->
129
+ [
130
+ %{
131
+ error: error.message
132
+ } | responses
133
+ ]
134
+ end
135
+
136
+ do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
137
+ end
138
+
139
+ defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do
140
+ {:ok, Enum.reverse(responses)}
141
+ end
142
+
143
  defp dispatch_command(
144
  command_array,
145
  %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
 
155
 
156
  {:error, error} ->
157
  {
158
+ :redis_error,
159
+ %{
160
  error: error.message
161
+ }
162
  }
163
  end
164
 
lib/srh/redis/client.ex CHANGED
@@ -27,6 +27,14 @@ defmodule Srh.Redis.Client do
27
  GenServer.call(client, {:find_worker})
28
  end
29
 
 
 
 
 
 
 
 
 
30
  def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state)
31
  when is_pid(registry_pid) do
32
  {:ok, worker} = ClientRegistry.find_worker(registry_pid)
@@ -34,10 +42,23 @@ defmodule Srh.Redis.Client do
34
  {:reply, worker, state}
35
  end
36
 
 
 
 
 
 
 
 
37
  def handle_call(_msg, _from, state) do
38
  {:reply, :ok, state}
39
  end
40
 
 
 
 
 
 
 
41
  def handle_cast(_msg, state) do
42
  {:noreply, state}
43
  end
 
27
  GenServer.call(client, {:find_worker})
28
  end
29
 
30
+ def borrow_worker(client) do
31
+ GenServer.call(client, {:borrow_worker})
32
+ end
33
+
34
+ def return_worker(client, pid) do
35
+ GenServer.cast(client, {:return_worker, pid})
36
+ end
37
+
38
  def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state)
39
  when is_pid(registry_pid) do
40
  {:ok, worker} = ClientRegistry.find_worker(registry_pid)
 
42
  {:reply, worker, state}
43
  end
44
 
45
+ def handle_call({:borrow_worker}, _from, %{registry_pid: registry_pid} = state)
46
+ when is_pid(registry_pid) do
47
+ {:ok, worker} = ClientRegistry.borrow_worker(registry_pid)
48
+ Process.send(self(), :reset_idle_death, [])
49
+ {:reply, worker, state}
50
+ end
51
+
52
  def handle_call(_msg, _from, state) do
53
  {:reply, :ok, state}
54
  end
55
 
56
+ def handle_cast({:return_worker, pid}, %{registry_pid: registry_pid} = state)
57
+ when is_pid(pid) and is_pid(registry_pid) do
58
+ ClientRegistry.return_worker(registry_pid, pid)
59
+ {:noreply, state}
60
+ end
61
+
62
  def handle_cast(_msg, state) do
63
  {:noreply, state}
64
  end
lib/srh/redis/client_registry.ex CHANGED
@@ -10,7 +10,8 @@ defmodule Srh.Redis.ClientRegistry do
10
  :ok,
11
  %{
12
  worker_pids: [],
13
- last_worker_index: 0
 
14
  }
15
  }
16
  end
@@ -19,6 +20,14 @@ defmodule Srh.Redis.ClientRegistry do
19
  GenServer.call(registry, {:find_worker})
20
  end
21
 
 
 
 
 
 
 
 
 
22
  def add_worker(registry, pid) do
23
  GenServer.cast(registry, {:add_worker, pid})
24
  end
@@ -27,25 +36,31 @@ defmodule Srh.Redis.ClientRegistry do
27
  GenServer.cast(registry, {:destroy_workers})
28
  end
29
 
30
- def handle_call({:find_worker}, _from, state) do
31
- case length(state.worker_pids) do
32
- 0 ->
33
- {:reply, {:error, :none_available}, state}
34
-
35
- len ->
36
- target = state.last_worker_index + 1
37
-
38
- corrected_target =
39
- case target >= len do
40
- true -> 0
41
- false -> target
42
- end
43
-
44
- {:reply, {:ok, Enum.at(state.worker_pids, corrected_target)},
45
- %{state | last_worker_index: corrected_target}}
 
46
  end
47
  end
48
 
 
 
 
 
 
49
  def handle_call(_msg, _from, state) do
50
  {:reply, :ok, state}
51
  end
@@ -72,6 +87,12 @@ defmodule Srh.Redis.ClientRegistry do
72
  {:noreply, %{state | worker_pids: [], last_worker_index: 0}}
73
  end
74
 
 
 
 
 
 
 
75
  def handle_cast(_msg, state) do
76
  {:noreply, state}
77
  end
@@ -83,4 +104,29 @@ defmodule Srh.Redis.ClientRegistry do
83
  def handle_info(_msg, state) do
84
  {:noreply, state}
85
  end
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  end
 
10
  :ok,
11
  %{
12
  worker_pids: [],
13
+ last_worker_index: 0,
14
+ currently_borrowed_pids: []
15
  }
16
  }
17
  end
 
20
  GenServer.call(registry, {:find_worker})
21
  end
22
 
23
+ def borrow_worker(registry) do
24
+ GenServer.call(registry, {:borrow_worker})
25
+ end
26
+
27
+ def return_worker(registry, pid) do
28
+ GenServer.cast(registry, {:return_worker, pid})
29
+ end
30
+
31
  def add_worker(registry, pid) do
32
  GenServer.cast(registry, {:add_worker, pid})
33
  end
 
36
  GenServer.cast(registry, {:destroy_workers})
37
  end
38
 
39
+ def handle_call({:borrow_worker}, _from, state) do
40
+ case do_find_worker(state) do
41
+ {{:error, msg}, state_update} ->
42
+ {:reply, {:error, msg}, state_update}
43
+
44
+ {{:ok, pid}, state_update} ->
45
+ # We want to put this pid into the borrowed pids state list
46
+ {
47
+ :reply,
48
+ {:ok, pid},
49
+ %{
50
+ state_update
51
+ | currently_borrowed_pids:
52
+ [pid | state_update.currently_borrowed_pids]
53
+ |> Enum.uniq()
54
+ }
55
+ }
56
  end
57
  end
58
 
59
+ def handle_call({:find_worker}, _from, state) do
60
+ {res, state_update} = do_find_worker(state)
61
+ {:reply, res, state_update}
62
+ end
63
+
64
  def handle_call(_msg, _from, state) do
65
  {:reply, :ok, state}
66
  end
 
87
  {:noreply, %{state | worker_pids: [], last_worker_index: 0}}
88
  end
89
 
90
+ def handle_cast({:return_worker, pid}, state) do
91
+ # Remove it from the borrowed array
92
+ {:noreply,
93
+ %{state | currently_borrowed_pids: List.delete(state.currently_borrowed_pids, pid)}}
94
+ end
95
+
96
  def handle_cast(_msg, state) do
97
  {:noreply, state}
98
  end
 
104
  def handle_info(_msg, state) do
105
  {:noreply, state}
106
  end
107
+
108
+ defp do_find_worker(state) do
109
+ filtered_pids =
110
+ state.worker_pids
111
+ |> Enum.filter(&(!Enum.member?(state.currently_borrowed_pids, &1)))
112
+
113
+ case length(filtered_pids) do
114
+ 0 ->
115
+ {{:error, :none_available}, state}
116
+
117
+ len ->
118
+ target = state.last_worker_index + 1
119
+
120
+ corrected_target =
121
+ case target >= len do
122
+ true -> 0
123
+ false -> target
124
+ end
125
+
126
+ {
127
+ {:ok, Enum.at(state.worker_pids, corrected_target)},
128
+ %{state | last_worker_index: corrected_target}
129
+ }
130
+ end
131
+ end
132
  end