Create custom database type for state sets

This commit is contained in:
Pim Kunis 2021-09-14 21:10:57 +02:00
parent 731143775d
commit f94c156f95
4 changed files with 125 additions and 111 deletions

View file

@ -6,8 +6,6 @@ defmodule Architex.RoomServer do
The RoomServers are supervised by a DynamicSupervisor RoomServer.Supervisor.
"""
@typep t :: map()
use GenServer
import Ecto.Query
@ -25,7 +23,7 @@ defmodule Architex.RoomServer do
Alias
}
alias Architex.Types.UserId
alias Architex.Types.{UserId, StateSet}
alias Architex.StateResolution.Authorization
alias ArchitexWeb.Client.Request.{CreateRoom, Kick, Ban}
@ -49,8 +47,13 @@ defmodule Architex.RoomServer do
@spec get_room_server(String.t()) :: {:error, :not_found} | DynamicSupervisor.on_start_child()
def get_room_server(room_id) do
# TODO: Might be wise to use a transaction here to prevent race conditions.
case Repo.one(from r in Room, where: r.id == ^room_id) do
%Room{state: serialized_state_set} = room ->
query =
Room
|> where([r], r.id == ^room_id)
|> select([:id, :forward_extremities, :state_set, :visibility])
case Repo.one(query) do
%Room{} = room ->
case Registry.lookup(@registry, room_id) do
[{pid, _}] ->
{:ok, pid}
@ -58,8 +61,7 @@ defmodule Architex.RoomServer do
[] ->
opts = [
name: {:via, Registry, {@registry, room_id}},
room: room,
serialized_state_set: serialized_state_set
room: room
]
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
@ -210,19 +212,7 @@ defmodule Architex.RoomServer do
@impl true
def init(opts) do
room = Keyword.fetch!(opts, :room)
serialized_state_set = Keyword.fetch!(opts, :serialized_state_set)
state_event_ids = Enum.map(serialized_state_set, fn [_, _, event_id] -> event_id end)
state_set =
Event
|> where([e], e.id in ^state_event_ids)
|> Repo.all()
|> Enum.into(%{}, fn %Event{type: type, state_key: state_key} = event ->
{{type, state_key}, event}
end)
{:ok, %{room: room, state_set: state_set}}
{:ok, %{room: Keyword.fetch!(opts, :room)}}
end
@impl true
@ -242,9 +232,9 @@ defmodule Architex.RoomServer do
{:ok, alias_} ->
events = create_room_events(room, account, request, alias_)
case Repo.transaction(process_events(room, %{}, events)) do
{:ok, {state_set, room}} ->
{:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_events(room, events)) do
{:ok, room} ->
{:reply, {:ok, room_id}, %{state | room: room}}
{:error, reason} ->
{:reply, {:error, reason}, state}
@ -258,7 +248,7 @@ defmodule Architex.RoomServer do
end
end
def handle_call({:server_in_room?, domain}, _from, %{state_set: state_set} = state) do
def handle_call({:server_in_room?, domain}, _from, %{room: %Room{state_set: state_set}} = state) do
result =
Enum.any?(state_set, fn
{{"m.room.member", user_id}, %Event{content: %{"membership" => "join"}}} ->
@ -314,12 +304,12 @@ defmodule Architex.RoomServer do
def handle_call(
{:invite, account, user_id, avatar_url, displayname},
_from,
%{room: room, state_set: state_set} = state
%{room: room} = state
) do
invite_event = Event.Invite.new(room, account, user_id, avatar_url, displayname)
case Repo.transaction(process_event(room, state_set, invite_event)) do
{:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, invite_event)) do
{:ok, {room, _}} -> {:reply, :ok, %{state | room: room}}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@ -327,24 +317,24 @@ defmodule Architex.RoomServer do
def handle_call(
{:join, account},
_from,
%{room: %Room{id: room_id} = room, state_set: state_set} = state
%{room: %Room{id: room_id} = room} = state
) do
join_event = Event.Join.new(room, account)
case Repo.transaction(process_event(room, state_set, join_event)) do
{:ok, {state_set, room, _}} ->
{:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, join_event)) do
{:ok, {room, _}} ->
{:reply, {:ok, room_id}, %{state | room: room}}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
def handle_call({:leave, account}, _from, %{room: room, state_set: state_set} = state) do
def handle_call({:leave, account}, _from, %{room: room} = state) do
leave_event = Event.Leave.new(room, account)
case Repo.transaction(process_event(room, state_set, leave_event)) do
{:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, leave_event)) do
{:ok, {room, _}} -> {:reply, :ok, %{state | room: room}}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@ -352,13 +342,13 @@ defmodule Architex.RoomServer do
def handle_call(
{:kick, account, %Kick{user_id: user_id, reason: reason}, avatar_url, displayname},
_from,
%{room: room, state_set: state_set} = state
%{room: room} = state
) do
kick_event =
Event.Kick.new(room, account, to_string(user_id), avatar_url, displayname, reason)
case Repo.transaction(process_event(room, state_set, kick_event)) do
{:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, kick_event)) do
{:ok, {room, _}} -> {:reply, :ok, %{state | room: room}}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@ -366,12 +356,12 @@ defmodule Architex.RoomServer do
def handle_call(
{:ban, account, %Ban{user_id: user_id, reason: reason}, avatar_url, displayname},
_from,
%{room: room, state_set: state_set} = state
%{room: room} = state
) do
ban_event = Event.Ban.new(room, account, to_string(user_id), avatar_url, displayname, reason)
case Repo.transaction(process_event(room, state_set, ban_event)) do
{:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, ban_event)) do
{:ok, {room, _}} -> {:reply, :ok, %{state | room: room}}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@ -379,12 +369,12 @@ defmodule Architex.RoomServer do
def handle_call(
{:unban, account, user_id, avatar_url, displayname},
_from,
%{room: room, state_set: state_set} = state
%{room: room} = state
) do
unban_event = Event.Unban.new(room, account, user_id, avatar_url, displayname)
case Repo.transaction(process_event(room, state_set, unban_event)) do
{:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, unban_event)) do
{:ok, {room, _}} -> {:reply, :ok, %{state | room: room}}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@ -392,7 +382,7 @@ defmodule Architex.RoomServer do
def handle_call(
{:set_visibility, account, visibility},
_from,
%{room: room, state_set: state_set} = state
%{room: %Room{state_set: state_set} = room} = state
) do
case state_set do
%{{"m.room.create", ""} => %Event{content: %{"creator" => creator}}} ->
@ -412,13 +402,13 @@ defmodule Architex.RoomServer do
def handle_call(
{:send_message_event, account, device, event_type, content, txn_id},
_from,
%{room: room, state_set: state_set} = state
%{room: room} = state
) do
message_event = Event.custom_event(room, account, event_type, content)
case Repo.transaction(process_event_with_txn(state_set, room, device, message_event, txn_id)) do
{:ok, {state_set, room, event_id}} ->
{:reply, {:ok, event_id}, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event_with_txn(room, device, message_event, txn_id)) do
{:ok, {room, event_id}} ->
{:reply, {:ok, event_id}, %{state | room: room}}
{:error, reason} ->
{:reply, {:error, reason}, state}
@ -428,20 +418,24 @@ defmodule Architex.RoomServer do
def handle_call(
{:send_state_event, account, event_type, content, state_key},
_from,
%{room: room, state_set: state_set} = state
%{room: room} = state
) do
state_event = Event.custom_state_event(room, account, event_type, content, state_key)
case Repo.transaction(process_event(room, state_set, state_event)) do
{:ok, {state_set, room, %Event{id: event_id}}} ->
{:reply, {:ok, event_id}, %{state | state_set: state_set, room: room}}
case Repo.transaction(process_event(room, state_event)) do
{:ok, {room, %Event{id: event_id}}} ->
{:reply, {:ok, event_id}, %{state | room: room}}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
def handle_call({:get_current_state, account}, _from, %{state_set: state_set} = state) do
def handle_call(
{:get_current_state, account},
_from,
%{room: %Room{state_set: state_set}} = state
) do
mxid = Account.get_mxid(account)
case state_set[{"m.room.member", mxid}] do
@ -466,7 +460,7 @@ defmodule Architex.RoomServer do
def handle_call(
{:get_state_event, account, event_type, state_key},
_from,
%{state_set: state_set} = state
%{room: %Room{state_set: state_set}} = state
) do
mxid = Account.get_mxid(account)
@ -491,10 +485,9 @@ defmodule Architex.RoomServer do
end
end
@spec process_event_with_txn(t(), Room.t(), Device.t(), %Event{}, String.t()) ::
(() -> {t(), Room.t(), String.t()} | {:error, atom()})
@spec process_event_with_txn(Room.t(), Device.t(), %Event{}, String.t()) ::
(() -> {Room.t(), String.t()} | {:error, atom()})
defp process_event_with_txn(
state_set,
room,
%Device{nid: device_nid} = device,
message_event,
@ -507,53 +500,42 @@ defmodule Architex.RoomServer do
where: dt.txn_id == ^txn_id and dt.device_nid == ^device_nid
) do
%DeviceTransaction{event_id: event_id} ->
{state_set, room, event_id}
{room, event_id}
nil ->
with {state_set, room, %Event{id: event_id}} <-
process_event(room, state_set, message_event).() do
with {room, %Event{id: event_id}} <- process_event(room, message_event).() do
# Mark this transaction as done.
Ecto.build_assoc(device, :device_transactions, txn_id: txn_id, event_id: event_id)
|> Repo.insert!()
{state_set, room, event_id}
{room, event_id}
end
end
end
end
@spec process_event(Room.t(), t(), %Event{}) ::
(() -> {t(), Room.t(), Event.t()} | {:error, atom()})
defp process_event(room, state_set, event) do
@spec process_event(Room.t(), %Event{}) :: (() -> {Room.t(), Event.t()} | {:error, atom()})
defp process_event(room, event) do
fn ->
case finalize_and_process_event(event, state_set, room) do
{:ok, state_set, room, event} ->
_ = update_room_state_set(room, state_set)
{state_set, room, event}
{:error, reason} ->
Repo.rollback(reason)
case finalize_and_process_event(event, room) do
{:ok, room, event} -> {room, event}
{:error, reason} -> Repo.rollback(reason)
end
end
end
@spec process_events(Room.t(), t(), [%Event{}]) ::
(() -> {t(), Room.t()} | {:error, atom()})
defp process_events(room, state_set, events) do
@spec process_events(Room.t(), [%Event{}]) :: (() -> Room.t() | {:error, atom()})
defp process_events(room, events) do
fn ->
Enum.reduce_while(events, {state_set, room}, fn event, {state_set, room} ->
case finalize_and_process_event(event, state_set, room) do
{:ok, state_set, room, _} -> {:cont, {state_set, room}}
Enum.reduce_while(events, room, fn event, room ->
case finalize_and_process_event(event, room) do
{:ok, room, _} -> {:cont, room}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> then(fn
{:error, reason} ->
Repo.rollback(reason)
{state_set, room} ->
_ = update_room_state_set(room, state_set)
{state_set, room}
{:error, reason} -> Repo.rollback(reason)
room -> room
end)
end
end
@ -618,20 +600,6 @@ defmodule Architex.RoomServer do
preset_events ++ initial_state_events ++ name_and_topic_events ++ invite_events
end
# Update the given room in the database with the given state set.
@spec update_room_state_set(Room.t(), t()) :: Room.t()
defp update_room_state_set(room, state_set) do
# TODO: We might as well hold state in the Room struct,
# instead of the state_set state.
# Create custom type for this.
serialized_state_set =
Enum.map(state_set, fn {{type, state_key}, %Event{id: event_id}} ->
[type, state_key, event_id]
end)
Repo.update!(change(room, state: serialized_state_set))
end
# Get the events for room creation as dictated by the given preset.
@spec room_creation_preset(Account.t(), String.t() | nil, Room.t()) :: [%Event{}]
defp room_creation_preset(account, nil, %Room{visibility: visibility} = room) do
@ -687,12 +655,11 @@ defmodule Architex.RoomServer do
# - Content hash
# - Event ID
# - Signature
@spec finalize_and_process_event(%Event{}, t(), Room.t()) ::
{:ok, t(), Room.t(), Event.t()} | {:error, atom()}
@spec finalize_and_process_event(%Event{}, Room.t()) ::
{:ok, Room.t(), Event.t()} | {:error, atom()}
defp finalize_and_process_event(
event,
state_set,
%Room{forward_extremities: forward_extremities} = room
%Room{forward_extremities: forward_extremities, state_set: state_set} = room
) do
event =
event
@ -701,7 +668,7 @@ defmodule Architex.RoomServer do
|> Map.put(:depth, get_depth(forward_extremities))
case Event.post_process(event) do
{:ok, event} -> authenticate_and_process_event(event, state_set, room)
{:ok, event} -> authenticate_and_process_event(event, room)
_ -> {:error, :event_creation}
end
end
@ -716,7 +683,7 @@ defmodule Architex.RoomServer do
end
# Get the auth events for an events.
@spec auth_events_for_event(%Event{}, t()) :: [Event.t()]
@spec auth_events_for_event(%Event{}, StateSet.t()) :: [Event.t()]
defp auth_events_for_event(%Event{type: "m.room.create"}, _), do: []
defp auth_events_for_event(
@ -768,9 +735,9 @@ defmodule Architex.RoomServer do
# Authenticate and insert a new event using state resolution.
# Implements the checks as described in the
# [Matrix docs](https://matrix.org/docs/spec/server_server/latest#checks-performed-on-receipt-of-a-pdu).
@spec authenticate_and_process_event(Event.t(), t(), Room.t()) ::
{:ok, t(), Room.t(), Event.t()} | {:error, atom()}
defp authenticate_and_process_event(event, current_state_set, room) do
@spec authenticate_and_process_event(Event.t(), Room.t()) ::
{:ok, Room.t(), Event.t()} | {:error, atom()}
defp authenticate_and_process_event(event, %Room{state_set: current_state_set} = room) do
# TODO: Correctly handle soft fails.
# Check the following things:
# 1. TODO: Is a valid event, otherwise it is dropped.
@ -788,8 +755,9 @@ defmodule Architex.RoomServer do
event = Repo.insert!(event)
state_set = StateResolution.resolve_forward_extremities(event)
:ok = update_membership(room, state_set)
room = Repo.update!(change(room, state_set: state_set))
{:ok, state_set, room, event}
{:ok, room, event}
else
_ -> {:error, :authorization}
end
@ -800,7 +768,7 @@ defmodule Architex.RoomServer do
# could access rooms they are not allowed to. Then again, maybe we should perform
# the "normal" authorization flow for local users as well, and treat the Membership
# table only as informational.
@spec update_membership(Room.t(), t()) :: :ok
@spec update_membership(Room.t(), StateSet.t()) :: :ok
defp update_membership(%Room{id: room_id}, state_set) do
server_name = Architex.server_name()

View file

@ -5,18 +5,19 @@ defmodule Architex.Room do
import Ecto.Query
alias Architex.{Repo, Room, Event, Alias, RoomServer, Account}
alias Architex.Types.StateSet
alias ArchitexWeb.Client.Request.{CreateRoom, Messages}
@type t :: %__MODULE__{
visibility: :public | :private,
state: list(list(String.t())),
state_set: StateSet.t(),
forward_extremities: list(String.t())
}
@primary_key {:id, :string, []}
schema "rooms" do
field :visibility, Ecto.Enum, values: [:public, :private]
field :state, {:array, {:array, :string}}
field :state_set, StateSet, load_in_query: false
field :forward_extremities, {:array, :string}
has_many :events, Event, foreign_key: :room_id
has_many :aliases, Alias, foreign_key: :room_id

View file

@ -0,0 +1,45 @@
defmodule Architex.Types.StateSet do
use Ecto.Type
import Ecto.Query
alias Architex.{Repo, Event}
@type t :: %{optional({String.t(), String.t()}) => Event.t()}
def type(), do: {:array, :string}
def cast(_), do: :error
def load(event_ids) when is_list(event_ids) do
events =
Event
|> where([e], e.id in ^event_ids)
|> Repo.all()
|> IO.inspect()
if length(events) == length(event_ids) do
state_set =
Enum.into(events, %{}, fn %Event{type: type, state_key: state_key} = event ->
{{type, state_key}, event}
end)
{:ok, state_set}
else
:error
end
end
def load(_), do: :error
def dump(state_set) when is_map(state_set) do
dumped =
Enum.map(state_set, fn {_, %Event{id: event_id}} ->
event_id
end)
{:ok, dumped}
end
def dump(_), do: :error
end

View file

@ -14,7 +14,7 @@ defmodule Architex.Repo.Migrations.CreateInitialTables do
create table(:rooms, primary_key: false) do
add :id, :string, primary_key: true
add :state, {:array, {:array, :string}}, default: [], null: false
add :state_set, {:array, :string}, default: [], null: false
add :forward_extremities, {:array, :string}, default: [], null: false
add :visibility, :string, null: false, default: "public"
end