From f94c156f950ddea675cf7d592974b82ff1b63ff1 Mon Sep 17 00:00:00 2001 From: Pim Kunis Date: Tue, 14 Sep 2021 21:10:57 +0200 Subject: [PATCH] Create custom database type for state sets --- lib/architex/room_server.ex | 184 ++++++++---------- lib/architex/schema/room.ex | 5 +- lib/architex/types/state_set.ex | 45 +++++ .../20210830160818_create_initial_tables.exs | 2 +- 4 files changed, 125 insertions(+), 111 deletions(-) create mode 100644 lib/architex/types/state_set.ex diff --git a/lib/architex/room_server.ex b/lib/architex/room_server.ex index 31c8d6f..5214ccc 100644 --- a/lib/architex/room_server.ex +++ b/lib/architex/room_server.ex @@ -6,8 +6,6 @@ defmodule Architex.RoomServer do The RoomServers are supervised by a DynamicSupervisor RoomServer.Supervisor. """ - @typep t :: map() - use GenServer import Ecto.Query @@ -25,7 +23,7 @@ defmodule Architex.RoomServer do Alias } - alias Architex.Types.UserId + alias Architex.Types.{UserId, StateSet} alias Architex.StateResolution.Authorization alias ArchitexWeb.Client.Request.{CreateRoom, Kick, Ban} @@ -49,8 +47,13 @@ defmodule Architex.RoomServer do @spec get_room_server(String.t()) :: {:error, :not_found} | DynamicSupervisor.on_start_child() def get_room_server(room_id) do # TODO: Might be wise to use a transaction here to prevent race conditions. - case Repo.one(from r in Room, where: r.id == ^room_id) do - %Room{state: serialized_state_set} = room -> + query = + Room + |> where([r], r.id == ^room_id) + |> select([:id, :forward_extremities, :state_set, :visibility]) + + case Repo.one(query) do + %Room{} = room -> case Registry.lookup(@registry, room_id) do [{pid, _}] -> {:ok, pid} @@ -58,8 +61,7 @@ defmodule Architex.RoomServer do [] -> opts = [ name: {:via, Registry, {@registry, room_id}}, - room: room, - serialized_state_set: serialized_state_set + room: room ] DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) @@ -210,19 +212,7 @@ defmodule Architex.RoomServer do @impl true def init(opts) do - room = Keyword.fetch!(opts, :room) - serialized_state_set = Keyword.fetch!(opts, :serialized_state_set) - state_event_ids = Enum.map(serialized_state_set, fn [_, _, event_id] -> event_id end) - - state_set = - Event - |> where([e], e.id in ^state_event_ids) - |> Repo.all() - |> Enum.into(%{}, fn %Event{type: type, state_key: state_key} = event -> - {{type, state_key}, event} - end) - - {:ok, %{room: room, state_set: state_set}} + {:ok, %{room: Keyword.fetch!(opts, :room)}} end @impl true @@ -242,9 +232,9 @@ defmodule Architex.RoomServer do {:ok, alias_} -> events = create_room_events(room, account, request, alias_) - case Repo.transaction(process_events(room, %{}, events)) do - {:ok, {state_set, room}} -> - {:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_events(room, events)) do + {:ok, room} -> + {:reply, {:ok, room_id}, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} @@ -258,7 +248,7 @@ defmodule Architex.RoomServer do end end - def handle_call({:server_in_room?, domain}, _from, %{state_set: state_set} = state) do + def handle_call({:server_in_room?, domain}, _from, %{room: %Room{state_set: state_set}} = state) do result = Enum.any?(state_set, fn {{"m.room.member", user_id}, %Event{content: %{"membership" => "join"}}} -> @@ -314,12 +304,12 @@ defmodule Architex.RoomServer do def handle_call( {:invite, account, user_id, avatar_url, displayname}, _from, - %{room: room, state_set: state_set} = state + %{room: room} = state ) do invite_event = Event.Invite.new(room, account, user_id, avatar_url, displayname) - case Repo.transaction(process_event(room, state_set, invite_event)) do - {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, invite_event)) do + {:ok, {room, _}} -> {:reply, :ok, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -327,24 +317,24 @@ defmodule Architex.RoomServer do def handle_call( {:join, account}, _from, - %{room: %Room{id: room_id} = room, state_set: state_set} = state + %{room: %Room{id: room_id} = room} = state ) do join_event = Event.Join.new(room, account) - case Repo.transaction(process_event(room, state_set, join_event)) do - {:ok, {state_set, room, _}} -> - {:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, join_event)) do + {:ok, {room, _}} -> + {:reply, {:ok, room_id}, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end - def handle_call({:leave, account}, _from, %{room: room, state_set: state_set} = state) do + def handle_call({:leave, account}, _from, %{room: room} = state) do leave_event = Event.Leave.new(room, account) - case Repo.transaction(process_event(room, state_set, leave_event)) do - {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, leave_event)) do + {:ok, {room, _}} -> {:reply, :ok, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -352,13 +342,13 @@ defmodule Architex.RoomServer do def handle_call( {:kick, account, %Kick{user_id: user_id, reason: reason}, avatar_url, displayname}, _from, - %{room: room, state_set: state_set} = state + %{room: room} = state ) do kick_event = Event.Kick.new(room, account, to_string(user_id), avatar_url, displayname, reason) - case Repo.transaction(process_event(room, state_set, kick_event)) do - {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, kick_event)) do + {:ok, {room, _}} -> {:reply, :ok, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -366,12 +356,12 @@ defmodule Architex.RoomServer do def handle_call( {:ban, account, %Ban{user_id: user_id, reason: reason}, avatar_url, displayname}, _from, - %{room: room, state_set: state_set} = state + %{room: room} = state ) do ban_event = Event.Ban.new(room, account, to_string(user_id), avatar_url, displayname, reason) - case Repo.transaction(process_event(room, state_set, ban_event)) do - {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, ban_event)) do + {:ok, {room, _}} -> {:reply, :ok, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -379,12 +369,12 @@ defmodule Architex.RoomServer do def handle_call( {:unban, account, user_id, avatar_url, displayname}, _from, - %{room: room, state_set: state_set} = state + %{room: room} = state ) do unban_event = Event.Unban.new(room, account, user_id, avatar_url, displayname) - case Repo.transaction(process_event(room, state_set, unban_event)) do - {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, unban_event)) do + {:ok, {room, _}} -> {:reply, :ok, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -392,7 +382,7 @@ defmodule Architex.RoomServer do def handle_call( {:set_visibility, account, visibility}, _from, - %{room: room, state_set: state_set} = state + %{room: %Room{state_set: state_set} = room} = state ) do case state_set do %{{"m.room.create", ""} => %Event{content: %{"creator" => creator}}} -> @@ -412,13 +402,13 @@ defmodule Architex.RoomServer do def handle_call( {:send_message_event, account, device, event_type, content, txn_id}, _from, - %{room: room, state_set: state_set} = state + %{room: room} = state ) do message_event = Event.custom_event(room, account, event_type, content) - case Repo.transaction(process_event_with_txn(state_set, room, device, message_event, txn_id)) do - {:ok, {state_set, room, event_id}} -> - {:reply, {:ok, event_id}, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event_with_txn(room, device, message_event, txn_id)) do + {:ok, {room, event_id}} -> + {:reply, {:ok, event_id}, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} @@ -428,20 +418,24 @@ defmodule Architex.RoomServer do def handle_call( {:send_state_event, account, event_type, content, state_key}, _from, - %{room: room, state_set: state_set} = state + %{room: room} = state ) do state_event = Event.custom_state_event(room, account, event_type, content, state_key) - case Repo.transaction(process_event(room, state_set, state_event)) do - {:ok, {state_set, room, %Event{id: event_id}}} -> - {:reply, {:ok, event_id}, %{state | state_set: state_set, room: room}} + case Repo.transaction(process_event(room, state_event)) do + {:ok, {room, %Event{id: event_id}}} -> + {:reply, {:ok, event_id}, %{state | room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end - def handle_call({:get_current_state, account}, _from, %{state_set: state_set} = state) do + def handle_call( + {:get_current_state, account}, + _from, + %{room: %Room{state_set: state_set}} = state + ) do mxid = Account.get_mxid(account) case state_set[{"m.room.member", mxid}] do @@ -466,7 +460,7 @@ defmodule Architex.RoomServer do def handle_call( {:get_state_event, account, event_type, state_key}, _from, - %{state_set: state_set} = state + %{room: %Room{state_set: state_set}} = state ) do mxid = Account.get_mxid(account) @@ -491,10 +485,9 @@ defmodule Architex.RoomServer do end end - @spec process_event_with_txn(t(), Room.t(), Device.t(), %Event{}, String.t()) :: - (() -> {t(), Room.t(), String.t()} | {:error, atom()}) + @spec process_event_with_txn(Room.t(), Device.t(), %Event{}, String.t()) :: + (() -> {Room.t(), String.t()} | {:error, atom()}) defp process_event_with_txn( - state_set, room, %Device{nid: device_nid} = device, message_event, @@ -507,53 +500,42 @@ defmodule Architex.RoomServer do where: dt.txn_id == ^txn_id and dt.device_nid == ^device_nid ) do %DeviceTransaction{event_id: event_id} -> - {state_set, room, event_id} + {room, event_id} nil -> - with {state_set, room, %Event{id: event_id}} <- - process_event(room, state_set, message_event).() do + with {room, %Event{id: event_id}} <- process_event(room, message_event).() do # Mark this transaction as done. Ecto.build_assoc(device, :device_transactions, txn_id: txn_id, event_id: event_id) |> Repo.insert!() - {state_set, room, event_id} + {room, event_id} end end end end - @spec process_event(Room.t(), t(), %Event{}) :: - (() -> {t(), Room.t(), Event.t()} | {:error, atom()}) - defp process_event(room, state_set, event) do + @spec process_event(Room.t(), %Event{}) :: (() -> {Room.t(), Event.t()} | {:error, atom()}) + defp process_event(room, event) do fn -> - case finalize_and_process_event(event, state_set, room) do - {:ok, state_set, room, event} -> - _ = update_room_state_set(room, state_set) - {state_set, room, event} - - {:error, reason} -> - Repo.rollback(reason) + case finalize_and_process_event(event, room) do + {:ok, room, event} -> {room, event} + {:error, reason} -> Repo.rollback(reason) end end end - @spec process_events(Room.t(), t(), [%Event{}]) :: - (() -> {t(), Room.t()} | {:error, atom()}) - defp process_events(room, state_set, events) do + @spec process_events(Room.t(), [%Event{}]) :: (() -> Room.t() | {:error, atom()}) + defp process_events(room, events) do fn -> - Enum.reduce_while(events, {state_set, room}, fn event, {state_set, room} -> - case finalize_and_process_event(event, state_set, room) do - {:ok, state_set, room, _} -> {:cont, {state_set, room}} + Enum.reduce_while(events, room, fn event, room -> + case finalize_and_process_event(event, room) do + {:ok, room, _} -> {:cont, room} {:error, reason} -> {:halt, {:error, reason}} end end) |> then(fn - {:error, reason} -> - Repo.rollback(reason) - - {state_set, room} -> - _ = update_room_state_set(room, state_set) - {state_set, room} + {:error, reason} -> Repo.rollback(reason) + room -> room end) end end @@ -618,20 +600,6 @@ defmodule Architex.RoomServer do preset_events ++ initial_state_events ++ name_and_topic_events ++ invite_events end - # Update the given room in the database with the given state set. - @spec update_room_state_set(Room.t(), t()) :: Room.t() - defp update_room_state_set(room, state_set) do - # TODO: We might as well hold state in the Room struct, - # instead of the state_set state. - # Create custom type for this. - serialized_state_set = - Enum.map(state_set, fn {{type, state_key}, %Event{id: event_id}} -> - [type, state_key, event_id] - end) - - Repo.update!(change(room, state: serialized_state_set)) - end - # Get the events for room creation as dictated by the given preset. @spec room_creation_preset(Account.t(), String.t() | nil, Room.t()) :: [%Event{}] defp room_creation_preset(account, nil, %Room{visibility: visibility} = room) do @@ -687,12 +655,11 @@ defmodule Architex.RoomServer do # - Content hash # - Event ID # - Signature - @spec finalize_and_process_event(%Event{}, t(), Room.t()) :: - {:ok, t(), Room.t(), Event.t()} | {:error, atom()} + @spec finalize_and_process_event(%Event{}, Room.t()) :: + {:ok, Room.t(), Event.t()} | {:error, atom()} defp finalize_and_process_event( event, - state_set, - %Room{forward_extremities: forward_extremities} = room + %Room{forward_extremities: forward_extremities, state_set: state_set} = room ) do event = event @@ -701,7 +668,7 @@ defmodule Architex.RoomServer do |> Map.put(:depth, get_depth(forward_extremities)) case Event.post_process(event) do - {:ok, event} -> authenticate_and_process_event(event, state_set, room) + {:ok, event} -> authenticate_and_process_event(event, room) _ -> {:error, :event_creation} end end @@ -716,7 +683,7 @@ defmodule Architex.RoomServer do end # Get the auth events for an events. - @spec auth_events_for_event(%Event{}, t()) :: [Event.t()] + @spec auth_events_for_event(%Event{}, StateSet.t()) :: [Event.t()] defp auth_events_for_event(%Event{type: "m.room.create"}, _), do: [] defp auth_events_for_event( @@ -768,9 +735,9 @@ defmodule Architex.RoomServer do # Authenticate and insert a new event using state resolution. # Implements the checks as described in the # [Matrix docs](https://matrix.org/docs/spec/server_server/latest#checks-performed-on-receipt-of-a-pdu). - @spec authenticate_and_process_event(Event.t(), t(), Room.t()) :: - {:ok, t(), Room.t(), Event.t()} | {:error, atom()} - defp authenticate_and_process_event(event, current_state_set, room) do + @spec authenticate_and_process_event(Event.t(), Room.t()) :: + {:ok, Room.t(), Event.t()} | {:error, atom()} + defp authenticate_and_process_event(event, %Room{state_set: current_state_set} = room) do # TODO: Correctly handle soft fails. # Check the following things: # 1. TODO: Is a valid event, otherwise it is dropped. @@ -788,8 +755,9 @@ defmodule Architex.RoomServer do event = Repo.insert!(event) state_set = StateResolution.resolve_forward_extremities(event) :ok = update_membership(room, state_set) + room = Repo.update!(change(room, state_set: state_set)) - {:ok, state_set, room, event} + {:ok, room, event} else _ -> {:error, :authorization} end @@ -800,7 +768,7 @@ defmodule Architex.RoomServer do # could access rooms they are not allowed to. Then again, maybe we should perform # the "normal" authorization flow for local users as well, and treat the Membership # table only as informational. - @spec update_membership(Room.t(), t()) :: :ok + @spec update_membership(Room.t(), StateSet.t()) :: :ok defp update_membership(%Room{id: room_id}, state_set) do server_name = Architex.server_name() diff --git a/lib/architex/schema/room.ex b/lib/architex/schema/room.ex index d07413f..e552b43 100644 --- a/lib/architex/schema/room.ex +++ b/lib/architex/schema/room.ex @@ -5,18 +5,19 @@ defmodule Architex.Room do import Ecto.Query alias Architex.{Repo, Room, Event, Alias, RoomServer, Account} + alias Architex.Types.StateSet alias ArchitexWeb.Client.Request.{CreateRoom, Messages} @type t :: %__MODULE__{ visibility: :public | :private, - state: list(list(String.t())), + state_set: StateSet.t(), forward_extremities: list(String.t()) } @primary_key {:id, :string, []} schema "rooms" do field :visibility, Ecto.Enum, values: [:public, :private] - field :state, {:array, {:array, :string}} + field :state_set, StateSet, load_in_query: false field :forward_extremities, {:array, :string} has_many :events, Event, foreign_key: :room_id has_many :aliases, Alias, foreign_key: :room_id diff --git a/lib/architex/types/state_set.ex b/lib/architex/types/state_set.ex new file mode 100644 index 0000000..6eab0c8 --- /dev/null +++ b/lib/architex/types/state_set.ex @@ -0,0 +1,45 @@ +defmodule Architex.Types.StateSet do + use Ecto.Type + + import Ecto.Query + + alias Architex.{Repo, Event} + + @type t :: %{optional({String.t(), String.t()}) => Event.t()} + + def type(), do: {:array, :string} + + def cast(_), do: :error + + def load(event_ids) when is_list(event_ids) do + events = + Event + |> where([e], e.id in ^event_ids) + |> Repo.all() + |> IO.inspect() + + if length(events) == length(event_ids) do + state_set = + Enum.into(events, %{}, fn %Event{type: type, state_key: state_key} = event -> + {{type, state_key}, event} + end) + + {:ok, state_set} + else + :error + end + end + + def load(_), do: :error + + def dump(state_set) when is_map(state_set) do + dumped = + Enum.map(state_set, fn {_, %Event{id: event_id}} -> + event_id + end) + + {:ok, dumped} + end + + def dump(_), do: :error +end diff --git a/priv/repo/migrations/20210830160818_create_initial_tables.exs b/priv/repo/migrations/20210830160818_create_initial_tables.exs index c8fae73..d9e2e84 100644 --- a/priv/repo/migrations/20210830160818_create_initial_tables.exs +++ b/priv/repo/migrations/20210830160818_create_initial_tables.exs @@ -14,7 +14,7 @@ defmodule Architex.Repo.Migrations.CreateInitialTables do create table(:rooms, primary_key: false) do add :id, :string, primary_key: true - add :state, {:array, {:array, :string}}, default: [], null: false + add :state_set, {:array, :string}, default: [], null: false add :forward_extremities, {:array, :string}, default: [], null: false add :visibility, :string, null: false, default: "public" end