Refactor room server

Serialize and save room state in database
Get room state from database when creating a room server
This commit is contained in:
Pim Kunis 2021-07-29 16:59:40 +02:00
parent 65368dc2d4
commit 9e02d5b95c
10 changed files with 122 additions and 146 deletions

View file

@ -6,6 +6,7 @@ Currently it is in a very early stage.
Some noteworthy contributions: Some noteworthy contributions:
* `lib/matrix_server/state_resolution.ex`: Implementation of version 2 of the Matrix state resolution algorithm. * `lib/matrix_server/state_resolution.ex`: Implementation of version 2 of the Matrix state resolution algorithm.
* `lib/matrix_server/state_resolution/authorization.ex`: Implementation of authorization rules for the state resolution algorithm.
* `lib/matrix_server/room_server.ex`: A GenServer that holds and manages the state of a room. * `lib/matrix_server/room_server.ex`: A GenServer that holds and manages the state of a room.
To run the server in development mode, run: To run the server in development mode, run:

View file

@ -28,7 +28,7 @@ defmodule MatrixServer.Event do
} }
end end
def create_room(room, %Account{localpart: localpart} = creator, room_version) do def create_room(room, %Account{localpart: localpart} = creator, room_version, auth_events \\ []) do
mxid = MatrixServer.get_mxid(localpart) mxid = MatrixServer.get_mxid(localpart)
%Event{ %Event{
@ -38,11 +38,12 @@ defmodule MatrixServer.Event do
content: %{ content: %{
"creator" => mxid, "creator" => mxid,
"room_version" => room_version || MatrixServer.default_room_version() "room_version" => room_version || MatrixServer.default_room_version()
} },
auth_events: auth_events
} }
end end
def join(room, %Account{localpart: localpart} = sender) do def join(room, %Account{localpart: localpart} = sender, auth_events \\ []) do
mxid = MatrixServer.get_mxid(localpart) mxid = MatrixServer.get_mxid(localpart)
%Event{ %Event{
@ -51,11 +52,12 @@ defmodule MatrixServer.Event do
state_key: mxid, state_key: mxid,
content: %{ content: %{
"membership" => "join" "membership" => "join"
} },
auth_events: auth_events
} }
end end
def power_levels(room, %Account{localpart: localpart} = sender) do def power_levels(room, %Account{localpart: localpart} = sender, auth_events \\ []) do
mxid = MatrixServer.get_mxid(localpart) mxid = MatrixServer.get_mxid(localpart)
%Event{ %Event{
@ -77,62 +79,68 @@ defmodule MatrixServer.Event do
"notifications" => %{ "notifications" => %{
"room" => 50 "room" => 50
} }
} },
auth_events: auth_events
} }
end end
def name(room, sender, name) do def name(room, sender, name, auth_events \\ []) do
%Event{ %Event{
new(room, sender) new(room, sender)
| type: "m.room.name", | type: "m.room.name",
state_key: "", state_key: "",
content: %{ content: %{
"name" => name "name" => name
} },
auth_events: auth_events
} }
end end
def topic(room, sender, topic) do def topic(room, sender, topic, auth_events \\ []) do
%Event{ %Event{
new(room, sender) new(room, sender)
| type: "m.room.topic", | type: "m.room.topic",
state_key: "", state_key: "",
content: %{ content: %{
"topic" => topic "topic" => topic
} },
auth_events: auth_events
} }
end end
def join_rules(room, sender, join_rule) do def join_rules(room, sender, join_rule, auth_events \\ []) do
%Event{ %Event{
new(room, sender) new(room, sender)
| type: "m.room.join_rules", | type: "m.room.join_rules",
state_key: "", state_key: "",
content: %{ content: %{
"join_rule" => join_rule "join_rule" => join_rule
} },
auth_events: auth_events
} }
end end
def history_visibility(room, sender, history_visibility) do def history_visibility(room, sender, history_visibility, auth_events \\ []) do
%Event{ %Event{
new(room, sender) new(room, sender)
| type: "m.room.history_visibility", | type: "m.room.history_visibility",
state_key: "", state_key: "",
content: %{ content: %{
"history_visibility" => history_visibility "history_visibility" => history_visibility
} },
auth_events: auth_events
} }
end end
def guest_access(room, sender, guest_access) do def guest_access(room, sender, guest_access, auth_events \\ []) do
%Event{ %Event{
new(room, sender) new(room, sender)
| type: "m.room.guest_access", | type: "m.room.guest_access",
state_key: "", state_key: "",
content: %{ content: %{
"guest_access" => guest_access "guest_access" => guest_access
} },
auth_events: auth_events
} }
end end

View file

@ -4,9 +4,9 @@ defmodule MatrixServer.QuickCheck do
alias MatrixServer.{Repo, Room, Account, RoomServer} alias MatrixServer.{Repo, Room, Account, RoomServer}
alias MatrixServerWeb.API.CreateRoom alias MatrixServerWeb.API.CreateRoom
def create_room do def create_room(name \\ nil, topic \\ nil) do
account = Repo.one!(from a in Account, limit: 1) account = Repo.one!(from a in Account, limit: 1)
input = %CreateRoom{} input = %CreateRoom{name: name, topic: topic}
%Room{id: room_id} = Repo.insert!(Room.create_changeset(input)) %Room{id: room_id} = Repo.insert!(Room.create_changeset(input))
{:ok, pid} = RoomServer.get_room_server(room_id) {:ok, pid} = RoomServer.get_room_server(room_id)
RoomServer.create_room(pid, account, input) RoomServer.create_room(pid, account, input)

View file

@ -10,7 +10,7 @@ defmodule MatrixServer.Room do
@primary_key {:id, :string, []} @primary_key {:id, :string, []}
schema "rooms" do schema "rooms" do
field :visibility, Ecto.Enum, values: [:public, :private] field :visibility, Ecto.Enum, values: [:public, :private]
field :state, :map field :state, {:array, {:array, :string}}
field :forward_extremities, {:array, :string} field :forward_extremities, {:array, :string}
has_many :events, Event, foreign_key: :event_id has_many :events, Event, foreign_key: :event_id
end end
@ -22,7 +22,7 @@ defmodule MatrixServer.Room do
def create_changeset(%CreateRoom{} = input) do def create_changeset(%CreateRoom{} = input) do
visibility = input.visibility || :public visibility = input.visibility || :public
%Room{id: generate_room_id(), forward_extremities: [], state: %{}} %Room{id: generate_room_id()}
|> changeset(%{visibility: visibility}) |> changeset(%{visibility: visibility})
end end

View file

@ -2,6 +2,7 @@ defmodule MatrixServer.RoomServer do
use GenServer use GenServer
import Ecto.Query import Ecto.Query
import Ecto.Changeset
alias MatrixServer.{Repo, Room, Event, StateResolution} alias MatrixServer.{Repo, Room, Event, StateResolution}
alias MatrixServerWeb.API.CreateRoom alias MatrixServerWeb.API.CreateRoom
@ -24,7 +25,7 @@ defmodule MatrixServer.RoomServer do
nil -> nil ->
{:error, :not_found} {:error, :not_found}
%Room{} -> %Room{state: serialized_state_set} ->
case Registry.lookup(@registry, room_id) do case Registry.lookup(@registry, room_id) do
[{pid, _}] -> [{pid, _}] ->
{:ok, pid} {:ok, pid}
@ -32,7 +33,8 @@ defmodule MatrixServer.RoomServer do
[] -> [] ->
opts = [ opts = [
name: {:via, Registry, {@registry, room_id}}, name: {:via, Registry, {@registry, room_id}},
room_id: room_id room_id: room_id,
serialized_state_set: serialized_state_set
] ]
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
@ -49,49 +51,65 @@ defmodule MatrixServer.RoomServer do
@impl true @impl true
def init(opts) do def init(opts) do
room_id = Keyword.fetch!(opts, :room_id) room_id = Keyword.fetch!(opts, :room_id)
serialized_state_set = Keyword.fetch!(opts, :serialized_state_set)
state_event_ids = Enum.map(serialized_state_set, fn [_, _, event_id] -> event_id end)
{:ok, %{room_id: room_id, state_set: %{}}} state_set =
Event
|> where([e], e.event_id in ^state_event_ids)
|> Repo.all()
|> Enum.into(%{}, fn %Event{type: type, state_key: state_key} = event ->
{{type, state_key}, event}
end)
{:ok, %{room_id: room_id, state_set: state_set}}
end end
@impl true @impl true
def handle_call({:create_room, account, input}, _from, %{room_id: room_id} = state) do def handle_call(
{:create_room, account,
%CreateRoom{room_version: room_version, name: name, topic: topic, preset: preset}},
_from,
%{room_id: room_id} = state
) do
result = result =
Repo.transaction(fn -> Repo.transaction(fn ->
room = Repo.one!(from r in Room, where: r.id == ^room_id)
create_room = Event.create_room(room, account, room_version)
join_creator = Event.join(room, account, [create_room.event_id])
pls = Event.power_levels(room, account, [create_room.event_id, join_creator.event_id])
auth_events = [create_room.event_id, join_creator.event_id, pls.event_id]
name_event = if name, do: Event.name(room, account, name, auth_events)
topic_event = if topic, do: Event.topic(room, account, topic, auth_events)
# TODO: power_level_content_override, initial_state, invite, invite_3pid # TODO: power_level_content_override, initial_state, invite, invite_3pid
with room <- Repo.one!(from r in Room, where: r.id == ^room_id), events =
{:ok, create_room_id, state_set, room} <- [create_room, join_creator, pls] ++
room_creation_create_room(account, input, room), room_creation_preset(account, preset, room, auth_events) ++
{:ok, join_creator_id, state_set, room} <- [name_event, topic_event]
room_creation_join_creator(account, room, state_set, [create_room_id]),
{:ok, pl_id, state_set, room} <- result =
room_creation_power_levels( events
account, |> Enum.reject(&Kernel.is_nil/1)
room, |> Enum.reduce_while({%{}, room}, fn event, {state_set, room} ->
state_set, case verify_and_insert_event(event, state_set, room) do
[create_room_id, join_creator_id] {:ok, state_set, room} -> {:cont, {state_set, room}}
), {:error, reason} -> {:halt, {:error, reason}}
{:ok, _, state_set, room} <- end
room_creation_preset(account, input, room, state_set, [ end)
create_room_id,
join_creator_id, case result do
pl_id {:error, reason} ->
]),
{:ok, _, state_set, room} <-
room_creation_name(account, input, room, state_set, [
create_room_id,
join_creator_id,
pl_id
]),
{:ok, _, state_set, _} <-
room_creation_topic(account, input, room, state_set, [
create_room_id,
join_creator_id,
pl_id
]) do
state_set
else
reason ->
Repo.rollback(reason) Repo.rollback(reason)
{state_set, room} ->
serialized_state_set =
Enum.map(state_set, fn {{type, state_key}, event} ->
[type, state_key, event.event_id]
end)
Repo.update!(change(room, state: serialized_state_set))
state_set
end end
end) end)
@ -101,46 +119,19 @@ defmodule MatrixServer.RoomServer do
end end
end end
defp room_creation_create_room(account, %CreateRoom{room_version: room_version}, room) do
Event.create_room(room, account, room_version)
|> verify_and_insert_event(%{}, room)
end
defp room_creation_join_creator(account, room, state_set, auth_events) do
Event.join(room, account)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end
defp room_creation_power_levels(account, room, state_set, auth_events) do
Event.power_levels(room, account)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end
# TODO: trusted_private_chat: # TODO: trusted_private_chat:
# All invitees are given the same power level as the room creator. # All invitees are given the same power level as the room creator.
defp room_creation_preset( defp room_creation_preset(account, nil, %Room{visibility: visibility} = room, auth_events) do
account,
%CreateRoom{preset: nil},
%Room{visibility: visibility} = room,
state_set,
auth_events
) do
preset = preset =
case visibility do case visibility do
:public -> "public_chat" :public -> "public_chat"
:private -> "private_chat" :private -> "private_chat"
end end
room_creation_preset(account, preset, room, state_set, auth_events) room_creation_preset(account, preset, room, auth_events)
end end
defp room_creation_preset(account, %CreateRoom{preset: preset}, room, state_set, auth_events) do defp room_creation_preset(account, preset, room, auth_events) do
room_creation_preset(account, preset, room, state_set, auth_events)
end
defp room_creation_preset(account, preset, room, state_set, auth_events) do
{join_rule, his_vis, guest_access} = {join_rule, his_vis, guest_access} =
case preset do case preset do
"private_chat" -> {"invite", "shared", "can_join"} "private_chat" -> {"invite", "shared", "can_join"}
@ -148,53 +139,15 @@ defmodule MatrixServer.RoomServer do
"public_chat" -> {"public", "shared", "forbidden"} "public_chat" -> {"public", "shared", "forbidden"}
end end
with {:ok, _, _, _} <- [
room_creation_join_rules(account, join_rule, room, state_set, auth_events), Event.join_rules(room, account, join_rule, auth_events),
{:ok, _, _, _} <- room_creation_his_vis(account, his_vis, room, state_set, auth_events) do Event.history_visibility(room, account, his_vis, auth_events),
room_creation_guest_access(account, guest_access, room, state_set, auth_events) Event.guest_access(room, account, guest_access, auth_events)
end ]
end
defp room_creation_join_rules(account, join_rule, room, state_set, auth_events) do
Event.join_rules(room, account, join_rule)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end
defp room_creation_his_vis(account, his_vis, room, state_set, auth_events) do
Event.history_visibility(room, account, his_vis)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end
defp room_creation_guest_access(account, guest_access, room, state_set, auth_events) do
Event.guest_access(room, account, guest_access)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end
defp room_creation_name(_, %CreateRoom{name: nil}, room, state_set, _) do
{:ok, nil, state_set, room}
end
defp room_creation_name(account, %CreateRoom{name: name}, room, state_set, auth_events) do
Event.name(room, account, name)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end
defp room_creation_topic(_, %CreateRoom{topic: nil}, room, state_set, _) do
{:ok, nil, state_set, room}
end
defp room_creation_topic(account, %CreateRoom{topic: topic}, room, state_set, auth_events) do
Event.topic(room, account, topic)
|> Map.put(:auth_events, auth_events)
|> verify_and_insert_event(state_set, room)
end end
defp verify_and_insert_event( defp verify_and_insert_event(
%Event{event_id: event_id} = event, event,
current_state_set, current_state_set,
%Room{forward_extremities: forward_extremities} = room %Room{forward_extremities: forward_extremities} = room
) do ) do
@ -215,9 +168,9 @@ defmodule MatrixServer.RoomServer do
if Authorization.authorized?(event, current_state_set) do if Authorization.authorized?(event, current_state_set) do
# We assume here that the event is always a forward extremity. # We assume here that the event is always a forward extremity.
room = Room.update_forward_extremities(event, room) room = Room.update_forward_extremities(event, room)
{:ok, event} = Repo.insert(event) event = Repo.insert!(event)
state_set = StateResolution.resolve_forward_extremities(event) state_set = StateResolution.resolve_forward_extremities(event)
{:ok, event_id, state_set, room} {:ok, state_set, room}
else else
{:error, :soft_failed} {:error, :soft_failed}
end end

View file

@ -264,9 +264,9 @@ defmodule MatrixServer.StateResolution do
end end
def update_state_set( def update_state_set(
%Event{type: event_type, state_key: state_key} = event, %Event{type: event_type, state_key: state_key} = event,
state_set state_set
) do ) do
Map.put(state_set, {event_type, state_key}, event) Map.put(state_set, {event_type, state_key}, event)
end end

View file

@ -44,7 +44,7 @@ defmodule MatrixServerWeb.AuthController do
def register(conn, %{"auth" => _}) do def register(conn, %{"auth" => _}) do
# Other login types are unsupported for now. # Other login types are unsupported for now.
put_error(conn, :forbidden) put_error(conn, :unrecognized, "Only m.login.dummy is supported currently.")
end end
def register(conn, _params) do def register(conn, _params) do
@ -87,8 +87,11 @@ defmodule MatrixServerWeb.AuthController do
|> put_status(200) |> put_status(200)
|> json(data) |> json(data)
{:error, error} -> {:error, error} when is_atom(error) ->
put_error(conn, error) put_error(conn, error)
{:error, _} ->
put_error(conn, :unknown)
end end
_ -> _ ->
@ -98,6 +101,6 @@ defmodule MatrixServerWeb.AuthController do
def login(conn, _params) do def login(conn, _params) do
# Other login types and identifiers are unsupported for now. # Other login types and identifiers are unsupported for now.
put_error(conn, :unknown) put_error(conn, :unrecognized, "Only m.login.password is supported currently.")
end end
end end

View file

@ -15,9 +15,10 @@ defmodule MatrixServerWeb.RoomController do
input = apply_changes(cs) input = apply_changes(cs)
# TODO: refactor # TODO: refactor
%Room{id: room_id} = Repo.insert!(Room.create_changeset(input)) # Room.create(account, input)
{:ok, pid} = RoomServer.get_room_server(room_id) # %Room{id: room_id} = Repo.insert!(Room.create_changeset(input))
RoomServer.create_room(pid, account, input) # {:ok, pid} = RoomServer.get_room_server(room_id)
# RoomServer.create_room(pid, account, input)
conn conn
|> put_status(200) |> put_status(200)

View file

@ -2,7 +2,7 @@ defmodule MatrixServerWeb.Plug.Error do
import Plug.Conn import Plug.Conn
import Phoenix.Controller, only: [json: 2] import Phoenix.Controller, only: [json: 2]
@error_code_and_message %{ @error_map %{
bad_json: {400, "M_BAD_JSON", "Bad request."}, bad_json: {400, "M_BAD_JSON", "Bad request."},
user_in_use: {400, "M_USER_IN_USE", "Username is already taken."}, user_in_use: {400, "M_USER_IN_USE", "Username is already taken."},
invalid_username: {400, "M_INVALID_USERNAME", "Invalid username."}, invalid_username: {400, "M_INVALID_USERNAME", "Invalid username."},
@ -13,9 +13,9 @@ defmodule MatrixServerWeb.Plug.Error do
missing_token: {401, "M_MISSING_TOKEN", "Access token required."} missing_token: {401, "M_MISSING_TOKEN", "Access token required."}
} }
def put_error(conn, error) do def put_error(conn, error, msg \\ nil) do
{status, errcode, errmsg} = @error_code_and_message[error] {status, errcode, default_msg} = @error_map[error]
data = %{errcode: errcode, error: errmsg} data = %{errcode: errcode, error: msg or default_msg}
conn conn
|> put_status(status) |> put_status(status)

View file

@ -0,0 +1,10 @@
defmodule MatrixServer.Repo.Migrations.ChangeRoomStateToArray do
use Ecto.Migration
def change do
alter table(:rooms) do
remove :state, :map, default: %{}, null: false
add :state, {:array, {:array, :string}}, default: [], null: false
end
end
end