From b9156ed8b302883475a1b60d7694a24bfc58fd8e Mon Sep 17 00:00:00 2001 From: Pim Kunis Date: Sat, 24 Jul 2021 00:13:12 +0200 Subject: [PATCH] Resolve state when inserting create room event Cleanup code --- lib/matrix_server/event.ex | 156 +------------------------- lib/matrix_server/room_server.ex | 59 +++------- lib/matrix_server/state_resolution.ex | 97 ++++++++-------- priv/repo/seeds.exs | 13 ++- 4 files changed, 83 insertions(+), 242 deletions(-) diff --git a/lib/matrix_server/event.ex b/lib/matrix_server/event.ex index edf4cd2..6eaca67 100644 --- a/lib/matrix_server/event.ex +++ b/lib/matrix_server/event.ex @@ -3,8 +3,7 @@ defmodule MatrixServer.Event do import Ecto.Changeset - alias MatrixServer.{Room, Event, Account} - alias MatrixServerWeb.API.CreateRoom + alias MatrixServer.{Room, Event} @primary_key {:event_id, :string, []} schema "events" do @@ -19,7 +18,6 @@ defmodule MatrixServer.Event do end def changeset(event, params \\ %{}) do - # TODO: prev/auth events? event |> cast(params, [:type, :timestamp, :state_key, :sender, :content]) |> validate_required([:type, :timestamp, :sender]) @@ -105,159 +103,7 @@ defmodule MatrixServer.Event do } end - def room_creation_create_room(repo, %{ - input: %CreateRoom{room_version: room_version}, - account: %Account{localpart: localpart}, - room: %Room{id: room_id} - }) do - # TODO: state resolution - create_room(room_id, MatrixServer.get_mxid(localpart), room_version) - # resolve([events_to_state_set([create_room_event])]) - # MatrixServer.StateResolution.resolve(create_room_event) - # repo.insert(create_room_event) - end - - def room_creation_join_creator(repo, %{ - room: %Room{id: room_id}, - create_room_event: %Event{sender: creator, event_id: create_room_id} - }) do - # TODO: state resolution - join(room_id, creator) - |> Map.put(:prev_events, [create_room_id]) - |> Map.put(:auth_events, [create_room_id]) - |> repo.insert() - end - - def room_creation_power_levels( - repo, - %{ - room: %Room{id: room_id}, - create_room_event: %Event{sender: creator, event_id: create_room_id}, - join_creator_event: %Event{event_id: join_creator_id} - } - ) do - # TODO: state resolution - power_levels(room_id, creator) - |> Map.put(:prev_events, [join_creator_id]) - |> Map.put(:auth_events, [create_room_id, join_creator_id]) - |> repo.insert() - end - - def room_creation_name(_repo, %{input: %CreateRoom{name: nil}}), do: {:ok, nil} - - def room_creation_name(_repo, %{input: %CreateRoom{name: name}}) when byte_size(name) > 255, - do: {:error, :name} - - def room_creation_name( - repo, - %{ - input: %CreateRoom{name: name}, - room: %Room{id: room_id}, - create_room_event: %Event{sender: creator, event_id: create_room_id}, - join_creator_event: %Event{event_id: join_creator_id}, - power_levels_event: %Event{event_id: power_levels_id} - } - ) do - # TODO: state resolution - room_name(room_id, creator, name) - |> Map.put(:prev_events, [power_levels_id]) - |> Map.put(:auth_events, [create_room_id, join_creator_id, power_levels_id]) - |> repo.insert() - end - - def room_creation_topic(_repo, %{input: %CreateRoom{topic: nil}}), do: {:ok, nil} - - def room_creation_topic( - repo, - %{ - input: %CreateRoom{topic: topic}, - room: %Room{id: room_id}, - create_room_event: %Event{sender: creator, event_id: create_room_id}, - join_creator_event: %Event{event_id: join_creator_id}, - power_levels_event: %Event{event_id: power_levels_id}, - name_event: name_event - } - ) do - # TODO: state resolution - prev_event = if name_event, do: name_event.event_id, else: power_levels_id - - room_topic(room_id, creator, topic) - |> Map.put(:prev_events, [prev_event]) - |> Map.put(:auth_events, [create_room_id, join_creator_id, power_levels_id]) - |> repo.insert() - end - def generate_event_id do "$" <> MatrixServer.random_string(17) <> ":" <> MatrixServer.server_name() end - - def events_to_state_set(events) do - Enum.into(events, %{}, fn %Event{type: type, state_key: state_key} = event -> - {{type, state_key}, event} - end) - end - - def resolve(state_sets) do - {unconflicted_state_map, conflicted_set} = calculate_conflict(state_sets) - # full_conflicted_set = MapSet.union(conflicted_set, auth_difference(state_sets)) - - # conflicted_control_events = - # Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new() - - # conflicted_control_events_with_auth = - # MapSet.union( - # conflicted_control_events, - # MapSet.intersection( - # full_conflicted_set, - # full_auth_chain(MapSet.to_list(conflicted_control_events)) - # ) - # ) - - # sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2) - # partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map) - - # other_conflicted_events = - # MapSet.difference(full_conflicted_set, conflicted_control_events_with_auth) - - # resolved_power_levels = partial_resolved_state[{:power_levels, ""}] - - # sorted_other_events = - # Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels)) - - # nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state) - - # Map.merge(nearly_final_state, unconflicted_state_map) - end - - def calculate_conflict(state_sets) do - {unconflicted, conflicted} = - state_sets - |> Enum.flat_map(&Map.keys/1) - |> MapSet.new() - |> Enum.map(fn state_pair -> - events = - Enum.map(state_sets, &Map.get(&1, state_pair)) - |> MapSet.new() - - {state_pair, events} - end) - |> Enum.split_with(fn {_k, events} -> - MapSet.size(events) == 1 - end) - - unconflicted_state_map = - Enum.into(unconflicted, %{}, fn {state_pair, events} -> - event = MapSet.to_list(events) |> hd() - - {state_pair, event} - end) - - conflicted_state_set = - Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc -> - MapSet.union(acc, events) - end) - |> MapSet.delete(nil) - - {unconflicted_state_map, conflicted_state_set} - end end diff --git a/lib/matrix_server/room_server.ex b/lib/matrix_server/room_server.ex index 255acf9..b9b3891 100644 --- a/lib/matrix_server/room_server.ex +++ b/lib/matrix_server/room_server.ex @@ -3,45 +3,26 @@ defmodule MatrixServer.RoomServer do alias MatrixServer.{Repo, Room, Event, Account} alias MatrixServerWeb.API.CreateRoom - alias Ecto.Multi @registry MatrixServer.RoomServer.Registry @supervisor MatrixServer.RoomServer.Supervisor - def get_room_server(room_id) do - case Registry.lookup(@registry, room_id) do - [{pid, _}] -> - {:ok, pid} - - [] -> - opts = [ - room_id: room_id, - name: {:via, Registry, {@registry, room_id}} - ] - - DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) - end - end - def start_link(opts) do {name, opts} = Keyword.pop(opts, :name) GenServer.start_link(__MODULE__, opts, name: name) end def create_room(input, account) do - Multi.new() - |> Multi.insert(:room, Room.create_changeset(input)) - |> Multi.run(:room_server, fn _repo, %{room: %Room{id: room_id} = room} -> - opts = [ - name: {:via, Registry, {@registry, room_id}}, - input: input, - account: account, - room: room - ] + %Room{id: room_id} = room = Repo.insert!(Room.create_changeset(input)) - DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) - end) - |> Repo.transaction() + opts = [ + name: {:via, Registry, {@registry, room_id}}, + input: input, + account: account, + room: room + ] + + DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) end @impl true @@ -50,27 +31,23 @@ defmodule MatrixServer.RoomServer do input = Keyword.fetch!(opts, :input) account = Keyword.fetch!(opts, :account) - state = %{ - room_id: room_id, - state_set: %{} - } - Repo.transaction(fn -> - with {:ok, create_room_event} <- insert_create_room_event(account, input, state) do - {:ok, state} + with {:ok, state_set} <- insert_create_room_event(account, input, room_id) do + {:ok, %{room_id: room_id, state_set: state_set}} end end) - - {:ok, state} end defp insert_create_room_event( %Account{localpart: localpart}, %CreateRoom{room_version: room_version}, - %{room_id: room_id, state_set: state_set} + room_id ) do - create_room_event = Event.create_room(room_id, MatrixServer.get_mxid(localpart), room_version) - MatrixServer.StateResolution.resolve(create_room_event) - {:ok, create_room_event} + state_set = + Event.create_room(room_id, MatrixServer.get_mxid(localpart), room_version) + |> Repo.insert!() + |> MatrixServer.StateResolution.resolve(true) + + {:ok, state_set} end end diff --git a/lib/matrix_server/state_resolution.ex b/lib/matrix_server/state_resolution.ex index ecaa923..548ad93 100644 --- a/lib/matrix_server/state_resolution.ex +++ b/lib/matrix_server/state_resolution.ex @@ -12,7 +12,7 @@ defmodule MatrixServer.StateResolution do |> Map.put(:auth_events, ["create", "join_charlie", "b"]) end - def resolve(%Event{room_id: room_id} = event) do + def resolve(%Event{room_id: room_id} = event, apply_state \\ false) do room_events = Event |> where([e], e.room_id == ^room_id) @@ -20,70 +20,74 @@ defmodule MatrixServer.StateResolution do |> Repo.all() |> Enum.into(%{}) - resolve(event, room_events) + resolve(event, room_events, apply_state) end def resolve( %Event{type: type, state_key: state_key, event_id: event_id, prev_events: prev_event_ids}, - room_events + room_events, + apply_state ) do state_sets = prev_event_ids |> Enum.map(&room_events[&1]) |> Enum.map(&resolve(&1, room_events)) - resolved_state = resolve(state_sets, room_events) + resolved_state = do_resolve(state_sets, room_events) # TODO: check if state event - Map.put(resolved_state, {type, state_key}, event_id) + if apply_state do + Map.put(resolved_state, {type, state_key}, event_id) + else + resolved_state + end end - def resolve([], _), do: %{} + def do_resolve([], _), do: %{} - def resolve(state_sets, room_events) do + def do_resolve(state_sets, room_events) do {unconflicted_state_map, conflicted_state_set} = calculate_conflict(state_sets) if MapSet.size(conflicted_state_set) == 0 do unconflicted_state_map else - full_conflicted_set = - MapSet.union(conflicted_state_set, auth_difference(state_sets, room_events)) - - conflicted_control_event_ids = - Enum.filter(full_conflicted_set, &is_control_event(&1, room_events)) |> MapSet.new() - - conflicted_control_event_ids_with_auth = - conflicted_control_event_ids - |> MapSet.to_list() - |> full_auth_chain(room_events) - |> MapSet.intersection(full_conflicted_set) - |> MapSet.union(conflicted_control_event_ids) - - conflicted_control_events_with_auth = - Enum.map(conflicted_control_event_ids_with_auth, &room_events[&1]) - - sorted_control_events = - Enum.sort(conflicted_control_events_with_auth, rev_top_pow_order(room_events)) - - partial_resolved_state = - iterative_auth_checks(sorted_control_events, unconflicted_state_map, room_events) - - other_conflicted_event_ids = - MapSet.difference(full_conflicted_set, conflicted_control_event_ids_with_auth) - - other_conflicted_events = Enum.map(other_conflicted_event_ids, &room_events[&1]) - - resolved_power_levels = partial_resolved_state[{"m.room.power_levels", ""}] - - sorted_other_events = - Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels, room_events)) - - nearly_final_state = - iterative_auth_checks(sorted_other_events, partial_resolved_state, room_events) - - Map.merge(nearly_final_state, unconflicted_state_map) + do_resolve(state_sets, room_events, unconflicted_state_map, conflicted_state_set) end end + def do_resolve(state_sets, room_events, unconflicted_state_map, conflicted_state_set) do + full_conflicted_set = + MapSet.union(conflicted_state_set, auth_difference(state_sets, room_events)) + + conflicted_control_event_ids = + full_conflicted_set + |> Enum.filter(&is_control_event(&1, room_events)) + |> MapSet.new() + + conflicted_control_events_with_auth_ids = + conflicted_control_event_ids + |> MapSet.to_list() + |> full_auth_chain(room_events) + |> MapSet.intersection(full_conflicted_set) + |> MapSet.union(conflicted_control_event_ids) + + sorted_control_events = + conflicted_control_events_with_auth_ids + |> Enum.map(&room_events[&1]) + |> Enum.sort(rev_top_pow_order(room_events)) + + partial_resolved_state = + iterative_auth_checks(sorted_control_events, unconflicted_state_map, room_events) + + resolved_power_levels = partial_resolved_state[{"m.room.power_levels", ""}] + + conflicted_control_events_with_auth_ids + |> MapSet.difference(full_conflicted_set) + |> Enum.map(&room_events[&1]) + |> Enum.sort(mainline_order(resolved_power_levels, room_events)) + |> iterative_auth_checks(partial_resolved_state, room_events) + |> Map.merge(unconflicted_state_map) + end + def calculate_conflict(state_sets) do {unconflicted, conflicted} = state_sets @@ -256,11 +260,14 @@ defmodule MatrixServer.StateResolution do def iterative_auth_checks(events, state_set, room_events) do Enum.reduce(events, state_set, fn event, acc -> - if is_authorized2(event, acc, room_events), do: insert_event(event, acc), else: acc + if is_authorized2(event, acc, room_events), do: update_state_set(event, acc), else: acc end) end - def insert_event(%Event{type: event_type, state_key: state_key, event_id: event_id}, state_set) do + def update_state_set( + %Event{type: event_type, state_key: state_key, event_id: event_id}, + state_set + ) do Map.put(state_set, {event_type, state_key}, event_id) end diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index 5210db4..e471462 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -1,4 +1,15 @@ -alias MatrixServer.{Repo, Room, Event} +alias MatrixServer.{Repo, Room, Event, Account, Device} + +Repo.insert!(%Account{ + localpart: "chuck", + password_hash: Bcrypt.hash_pwd_salt("sneed") +}) + +Repo.insert(%Device{ + device_id: "android", + display_name: "My Android", + localpart: "chuck" +}) # Auth difference example from here: # https://matrix.org/docs/guides/implementing-stateres#auth-differences