diff --git a/lib/matrix_server/application.ex b/lib/matrix_server/application.ex index a826484..7f32531 100644 --- a/lib/matrix_server/application.ex +++ b/lib/matrix_server/application.ex @@ -11,15 +11,11 @@ defmodule MatrixServer.Application do MatrixServerWeb.Telemetry, {Phoenix.PubSub, name: MatrixServer.PubSub}, MatrixServerWeb.Endpoint, - MatrixServer.RoomServer - # Start a worker by calling: MatrixServer.Worker.start_link(arg) - # {MatrixServer.Worker, arg} + {Registry, keys: :unique, name: MatrixServer.RoomServer.Registry}, + {DynamicSupervisor, name: MatrixServer.RoomServer.Supervisor, strategy: :one_for_one} ] - # See https://hexdocs.pm/elixir/Supervisor.html - # for other strategies and supported options - opts = [strategy: :one_for_one, name: MatrixServer.Supervisor] - Supervisor.start_link(children, opts) + Supervisor.start_link(children, name: MatrixServer.Supervisor, strategy: :one_for_one) end # Tell Phoenix to update the endpoint configuration diff --git a/lib/matrix_server/event.ex b/lib/matrix_server/event.ex index 3a53936..edf4cd2 100644 --- a/lib/matrix_server/event.ex +++ b/lib/matrix_server/event.ex @@ -54,7 +54,7 @@ defmodule MatrixServer.Event do | type: "m.room.member", state_key: sender, content: %{ - "membership" => "invite" + "membership" => "join" } } end @@ -111,9 +111,10 @@ defmodule MatrixServer.Event do room: %Room{id: room_id} }) do # TODO: state resolution - create_room_event = create_room(room_id, MatrixServer.get_mxid(localpart), room_version) - resolve([events_to_state_set([create_room_event])]) - repo.insert(create_room_event) + create_room(room_id, MatrixServer.get_mxid(localpart), room_version) + # resolve([events_to_state_set([create_room_event])]) + # MatrixServer.StateResolution.resolve(create_room_event) + # repo.insert(create_room_event) end def room_creation_join_creator(repo, %{ @@ -244,16 +245,18 @@ defmodule MatrixServer.Event do MapSet.size(events) == 1 end) - unconflicted_state_map = Enum.into(unconflicted, %{}, fn {state_pair, events} -> - event = MapSet.to_list(events) |> hd() + unconflicted_state_map = + Enum.into(unconflicted, %{}, fn {state_pair, events} -> + event = MapSet.to_list(events) |> hd() - {state_pair, event} - end) + {state_pair, event} + end) - conflicted_state_set = Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc -> - MapSet.union(acc, events) - end) - |> MapSet.delete(nil) + conflicted_state_set = + Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc -> + MapSet.union(acc, events) + end) + |> MapSet.delete(nil) {unconflicted_state_map, conflicted_state_set} end diff --git a/lib/matrix_server/room.ex b/lib/matrix_server/room.ex index 22cf895..a277344 100644 --- a/lib/matrix_server/room.ex +++ b/lib/matrix_server/room.ex @@ -3,12 +3,13 @@ defmodule MatrixServer.Room do import Ecto.Changeset - alias __MODULE__ + alias MatrixServer.{Room, Event} alias MatrixServerWeb.API.CreateRoom @primary_key {:id, :string, []} schema "rooms" do field :visibility, Ecto.Enum, values: [:public, :private] + has_many :events, Event, foreign_key: :event_id end def changeset(room, params \\ %{}) do diff --git a/lib/matrix_server/room_server.ex b/lib/matrix_server/room_server.ex index e791681..255acf9 100644 --- a/lib/matrix_server/room_server.ex +++ b/lib/matrix_server/room_server.ex @@ -1,41 +1,76 @@ defmodule MatrixServer.RoomServer do use GenServer - alias MatrixServer.{Repo, Room, Event} + alias MatrixServer.{Repo, Room, Event, Account} alias MatrixServerWeb.API.CreateRoom alias Ecto.Multi - def start_link(_opts) do - GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + @registry MatrixServer.RoomServer.Registry + @supervisor MatrixServer.RoomServer.Supervisor + + def get_room_server(room_id) do + case Registry.lookup(@registry, room_id) do + [{pid, _}] -> + {:ok, pid} + + [] -> + opts = [ + room_id: room_id, + name: {:via, Registry, {@registry, room_id}} + ] + + DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) + end end - def create_room(%CreateRoom{} = input, account) do - GenServer.call(__MODULE__, {:create_room, input, account}) + def start_link(opts) do + {name, opts} = Keyword.pop(opts, :name) + GenServer.start_link(__MODULE__, opts, name: name) + end + + def create_room(input, account) do + Multi.new() + |> Multi.insert(:room, Room.create_changeset(input)) + |> Multi.run(:room_server, fn _repo, %{room: %Room{id: room_id} = room} -> + opts = [ + name: {:via, Registry, {@registry, room_id}}, + input: input, + account: account, + room: room + ] + + DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts}) + end) + |> Repo.transaction() end @impl true - def init(:ok) do - {:ok, %{}} + def init(opts) do + %Room{id: room_id} = Keyword.fetch!(opts, :room) + input = Keyword.fetch!(opts, :input) + account = Keyword.fetch!(opts, :account) + + state = %{ + room_id: room_id, + state_set: %{} + } + + Repo.transaction(fn -> + with {:ok, create_room_event} <- insert_create_room_event(account, input, state) do + {:ok, state} + end + end) + + {:ok, state} end - @impl true - def handle_call({:create_room, input, account}, _from, state) do - # TODO: preset events, initial_state events, invite, invite_3pid - result = - Multi.new() - |> Multi.put(:input, input) - |> Multi.put(:account, account) - |> Multi.insert(:room, Room.create_changeset(input)) - |> Multi.run(:create_room_event, &Event.room_creation_create_room/2) - |> Multi.run(:join_creator_event, &Event.room_creation_join_creator/2) - |> Multi.run(:power_levels_event, &Event.room_creation_power_levels/2) - |> Multi.run(:name_event, &Event.room_creation_name/2) - |> Multi.run(:topic_event, &Event.room_creation_topic/2) - |> Multi.run(:temp, fn _, _ -> - {:error, :lol} - end) - |> Repo.transaction() - - {:reply, result, state} + defp insert_create_room_event( + %Account{localpart: localpart}, + %CreateRoom{room_version: room_version}, + %{room_id: room_id, state_set: state_set} + ) do + create_room_event = Event.create_room(room_id, MatrixServer.get_mxid(localpart), room_version) + MatrixServer.StateResolution.resolve(create_room_event) + {:ok, create_room_event} end end diff --git a/lib/matrix_server/state_resolution.ex b/lib/matrix_server/state_resolution.ex index d6eeec2..d4c7186 100644 --- a/lib/matrix_server/state_resolution.ex +++ b/lib/matrix_server/state_resolution.ex @@ -1,200 +1,108 @@ -# https://matrix.uhoreg.ca/stateres/reloaded.html defmodule MatrixServer.StateResolution do - @derive {Inspect, except: [:prev_events, :auth_events]} - defstruct [ - :event_id, - :event_type, - :timestamp, - :state_key, - :sender, - :content, - :prev_events, - :auth_events, - :power_levels - ] + import Ecto.Query - alias __MODULE__, as: Event + alias MatrixServer.{Repo, Event} - @type t :: %Event{event_id: String.t(), event_type: Atom.t(), timestamp: Integer.t()} + def example do + %Event{content: content} = event = Event.power_levels("room1", "charlie") + event = %Event{event | content: %{content | "ban" => 0}} - def new_state_event, do: %Event{new() | event_type: :state} - def new_message_event, do: %Event{new() | event_type: :message} - - # TODO: remove state_key default here - def new do - %Event{ - event_id: "", - timestamp: 0, - state_key: "", - sender: "", - content: "", - prev_events: [], - auth_events: [], - power_levels: %{} - } + event + |> Map.put(:prev_events, ["b", "fork"]) + |> Map.put(:auth_events, ["create", "join_charlie", "b"]) end - def join(user), do: %Event{membership(user) | content: "join"} - def leave(user), do: %Event{membership(user) | content: "leave"} - def invite(actor, subject), do: %Event{membership(actor, subject) | content: "invite"} - def kick(actor, subject), do: %Event{membership(actor, subject) | content: "leave"} - def ban(actor, subject), do: %Event{membership(actor, subject) | content: "ban"} - - def set_power_levels(user, power_levels) do - %Event{new() | event_type: :power_levels, sender: user, power_levels: power_levels} + def resolve(%Event{type: type, state_key: state_key, event_id: event_id, prev_events: prev_event_ids}) do + state_sets = Event + |> where([e], e.event_id in ^prev_event_ids) + |> Repo.all() + |> Enum.map(&resolve/1) + + resolved_state = resolve(state_sets) + # TODO: check if state event + Map.put(resolved_state, {type, state_key}, event_id) end - def set_topic(user, topic) do - %Event{new() | event_type: :topic, sender: user, content: topic} + def resolve([]), do: %{} + + def resolve(state_sets) do + {unconflicted_state_map, conflicted_state_set} = calculate_conflict(state_sets) + if MapSet.size(conflicted_state_set) == 0 do + unconflicted_state_map + else + full_conflicted_set = MapSet.union(conflicted_state_set, auth_difference(state_sets)) + + conflicted_control_event_ids = + Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new() + + conflicted_control_event_ids_with_auth = + conflicted_control_event_ids + |> MapSet.to_list() + |> full_auth_chain() + |> MapSet.intersection(full_conflicted_set) + |> MapSet.union(conflicted_control_event_ids) + + conflicted_control_events_with_auth = + Repo.all( + from e in Event, + where: e.event_id in ^MapSet.to_list(conflicted_control_event_ids_with_auth) + ) + + sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2) + + partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map) + + other_conflicted_event_ids = + MapSet.difference(full_conflicted_set, conflicted_control_event_ids_with_auth) + + other_conflicted_events = + Repo.all(from e in Event, where: e.event_id in ^MapSet.to_list(other_conflicted_event_ids)) + + resolved_power_levels = partial_resolved_state[{"m.room.power_levels", ""}] + + sorted_other_events = + Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels)) + + nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state) + + Map.merge(nearly_final_state, unconflicted_state_map) + end end - def get_state_set_from_event_list(events) do - Enum.reduce(events, %{}, fn - %Event{event_type: event_type, state_key: state_key} = event, acc -> - Map.put(acc, {event_type, state_key}, event) - end) - end - - def auth_chain(event), do: auth_chain(event, MapSet.new()) - - def auth_chain(%Event{auth_events: auth_events}, set) do - Enum.reduce(auth_events, set, fn event, acc -> - event - |> auth_chain() - |> MapSet.union(acc) - |> MapSet.put(event) - end) - end - - def in_room(user, state_set) when is_map_key(state_set, {:membership, user}) do - state_set[{:membership, user}].content == "join" - end - - def in_room(_, _), do: false - - def get_power_levels(state_set) when is_map_key(state_set, {:power_levels, ""}) do - state_set[{:power_levels, ""}].power_levels - end - - def get_power_levels(_), do: nil - - def has_power_level(_, nil, _), do: true - - def has_power_level(user, power_levels, level) do - Map.get(power_levels, user, 0) >= level - end - - # No join rules specified, allow joining for room creator only. - def allowed_to_join(user, state_set) when not is_map_key(state_set, {:join_rules, ""}) do - state_set[{:create, ""}].sender == user - end - - # TODO: join and power levels events - def is_authorized(%Event{event_type: :create, prev_events: prev_events}, _), - do: prev_events == [] - - def is_authorized(%Event{event_type: :membership, content: "join", state_key: user}, state_set) do - allowed_to_join(user, state_set) - end - - def is_authorized(%Event{sender: sender} = event, state_set) do - in_room(sender, state_set) and - has_power_level(sender, get_power_levels(state_set), get_event_power_level(event)) - end - - def is_authorized2(%Event{auth_events: auth_events} = event, state_set) do - state_set = - Enum.reduce(auth_events, state_set, fn %Event{event_type: event_type, state_key: state_key} = - event, - acc -> - Map.put_new(acc, {event_type, state_key}, event) - end) - - is_authorized(event, state_set) - end - - def iterative_auth_checks(events, state_set) do - Enum.reduce(events, state_set, fn event, acc -> - if is_authorized2(event, acc), do: insert_event(event, acc), else: acc - end) - end - - def insert_event(%Event{event_type: event_type, state_key: state_key} = event, state_set) do - Map.put(state_set, {event_type, state_key}, event) - end - - def is_control_event(%Event{event_type: :power_levels, state_key: ""}), do: true - - def is_control_event(%Event{event_type: :join_rules, state_key: ""}), do: true - - def is_control_event(%Event{ - event_type: :membership, - state_key: state_key, - sender: sender, - content: "ban" - }), - do: sender != state_key - - def is_control_event(%Event{ - event_type: :membership, - state_key: state_key, - sender: sender, - content: "leave" - }), - do: sender != state_key - - def is_control_event(_), do: false - def calculate_conflict(state_sets) do - domain = + {unconflicted, conflicted} = state_sets - |> Enum.map(&Map.keys/1) - |> List.flatten() + |> Enum.flat_map(&Map.keys/1) |> MapSet.new() - - full_state_map_list = - Enum.map(domain, fn k -> + |> Enum.into(%{}, fn state_pair -> events = - Enum.map(state_sets, &Map.get(&1, k)) + Enum.map(state_sets, &Map.get(&1, state_pair)) |> MapSet.new() - {k, events} + {state_pair, events} end) - - {unconflicted, conflicted} = - Enum.split_with(full_state_map_list, fn {_k, events} -> + |> Enum.split_with(fn {_, events} -> MapSet.size(events) == 1 end) unconflicted_state_map = - Enum.map(unconflicted, fn {k, events} -> - event = - events - |> MapSet.to_list() - |> hd() + Enum.into(unconflicted, %{}, fn {state_pair, events} -> + event = MapSet.to_list(events) |> hd() - {k, event} + {state_pair, event} end) - |> Enum.into(%{}) - conflicted_state_map = - Enum.flat_map(conflicted, fn {_, events} -> - events - |> MapSet.delete(nil) - |> MapSet.to_list() + conflicted_state_set = + Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc -> + MapSet.union(acc, events) end) - |> MapSet.new() + |> MapSet.delete(nil) - {unconflicted_state_map, conflicted_state_map} - end - - def full_auth_chain(events) do - events - |> Enum.map(&auth_chain/1) - |> Enum.reduce(MapSet.new(), &MapSet.union/2) + {unconflicted_state_map, conflicted_state_set} end def auth_difference(state_sets) do + # TODO: memoization possible full_auth_chains = Enum.map(state_sets, fn state_set -> state_set @@ -208,9 +116,50 @@ defmodule MatrixServer.StateResolution do MapSet.difference(auth_chain_union, auth_chain_intersection) end + def full_auth_chain(event_ids) do + event_ids + |> Enum.map(&auth_chain/1) + |> Enum.reduce(MapSet.new(), &MapSet.union/2) + end + + def auth_chain(event_id) do + # TODO: handle when auth event is not found. + Event + |> where([e], e.event_id == ^event_id) + |> select([e], e.auth_events) + |> Repo.one!() + |> Enum.reduce(MapSet.new(), fn auth_event_id, acc -> + auth_event_id + |> auth_chain() + |> MapSet.union(acc) + |> MapSet.put(auth_event_id) + end) + end + + def is_control_event(event_id) when is_binary(event_id) do + Event + |> where([e], e.event_id == ^event_id) + |> Repo.one!() + |> is_control_event() + end + + def is_control_event(%Event{type: "m.room.power_levels", state_key: ""}), do: true + def is_control_event(%Event{type: "m.room.join_rules", state_key: ""}), do: true + + def is_control_event(%Event{ + type: "m.room.member", + state_key: state_key, + sender: sender, + content: %{membership: membership} + }) + when sender != state_key and membership in ["leave", "ban"], + do: true + + def is_control_event(_), do: false + def rev_top_pow_order( - %Event{timestamp: timestamp1, event_id: event_id1} = event1, - %Event{timestamp: timestamp2, event_id: event_id2} = event2 + %Event{origin_server_ts: timestamp1, event_id: event_id1} = event1, + %Event{origin_server_ts: timestamp2, event_id: event_id2} = event2 ) do {power1, power2} = {get_power_level(event1), get_power_level(event2)} @@ -225,37 +174,31 @@ defmodule MatrixServer.StateResolution do end end - def get_power_level(%Event{sender: sender, auth_events: auth_events}) do - pl_event = Enum.find(auth_events, &(&1.event_type == :power_levels)) + def get_power_level(%Event{sender: sender, auth_events: auth_event_ids}) do + pl_content = + Event + |> where([e], e.event_id in ^auth_event_ids) + |> where([e], e.type == "m.room.power_levels") + |> select([e], e.content) + |> Repo.one() - case pl_event do - %Event{power_levels: power_levels} -> Map.get(power_levels, sender, 0) - _ -> 0 + case pl_content do + %{"users" => pl_users} -> Map.get(pl_users, sender, 0) + nil -> 0 end end - def mainline(event) do - event - |> mainline([]) - |> Enum.reverse() - end - - def mainline(%Event{auth_events: auth_events} = event, acc) do - case Enum.find(auth_events, &(&1.event_type == :power_levels)) do - nil -> [event | acc] - pl_event -> mainline(pl_event, [event | acc]) - end - end - - def mainline_order(p) do + def mainline_order(event_id) do mainline_map = - p + Event + |> where([e], e.event_id == ^event_id) + |> Repo.one!() |> mainline() |> Enum.with_index() |> Enum.into(%{}) - fn %Event{timestamp: timestamp1, event_id: event_id1} = event1, - %Event{timestamp: timestamp2, event_id: event_id2} = event2 -> + fn %Event{origin_server_ts: timestamp1, event_id: event_id1} = event1, + %Event{origin_server_ts: timestamp2, event_id: event_id2} = event2 -> mainline_depth1 = get_mainline_depth(mainline_map, event1) mainline_depth2 = get_mainline_depth(mainline_map, event2) @@ -289,181 +232,110 @@ defmodule MatrixServer.StateResolution do end end - def resolve(state_sets) do - {unconflicted_state_map, conflicted_set} = calculate_conflict(state_sets) - full_conflicted_set = MapSet.union(conflicted_set, auth_difference(state_sets)) + def mainline(event) do + event + |> mainline([]) + |> Enum.reverse() + end - conflicted_control_events = - Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new() + def mainline(%Event{auth_events: auth_event_ids} = event, acc) do + pl_event = + Event + |> where([e], e.event_id in ^auth_event_ids) + |> where([e], e.type == "m.room.power_levels") + |> Repo.one() - conflicted_control_events_with_auth = - MapSet.union( - conflicted_control_events, - MapSet.intersection( - full_conflicted_set, - full_auth_chain(MapSet.to_list(conflicted_control_events)) - ) + case pl_event do + %Event{} -> mainline(pl_event, [event | acc]) + nil -> [event | acc] + end + end + + def iterative_auth_checks(events, state_set) do + Enum.reduce(events, state_set, fn event, acc -> + if is_authorized2(event, acc), do: insert_event(event, acc), else: acc + end) + end + + def insert_event(%Event{type: event_type, state_key: state_key, event_id: event_id}, state_set) do + Map.put(state_set, {event_type, state_key}, event_id) + end + + def is_authorized2(%Event{auth_events: auth_event_ids} = event, state_set) do + state_set = + Event + |> where([e], e.event_id in ^auth_event_ids) + |> Repo.all() + |> Enum.reduce(state_set, fn %Event{ + type: event_type, + state_key: state_key, + event_id: event_id + }, + acc -> + Map.put_new(acc, {event_type, state_key}, event_id) + end) + + is_authorized(event, state_set) + end + + # TODO: join and power levels events + def is_authorized(%Event{type: "m.room.create", prev_events: prev_events}, _), + do: prev_events == [] + + def is_authorized( + %Event{type: "m.room.member", content: %{"membership" => "join"}, state_key: user}, + state_set + ) do + allowed_to_join(user, state_set) + end + + def is_authorized(%Event{sender: sender} = event, state_set) do + in_room(sender, state_set) and + has_power_level(sender, get_power_levels(state_set), get_event_power_level(event)) + end + + def in_room(user, state_set) when is_map_key(state_set, {"m.room.member", user}) do + content = + Repo.one!( + from e in Event, + where: e.event_id == ^state_set[{"m.room.member", user}], + select: e.content ) - sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2) - partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map) - - other_conflicted_events = - MapSet.difference(full_conflicted_set, conflicted_control_events_with_auth) - - resolved_power_levels = partial_resolved_state[{:power_levels, ""}] - - sorted_other_events = - Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels)) - - nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state) - - Map.merge(nearly_final_state, unconflicted_state_map) + case content["membership"] do + "join" -> true + _ -> false + end end - def example1 do - create = %Event{new() | event_id: "create", event_type: :create, sender: "@alice:example.com"} + def in_room(_, _), do: false - alice_joins = %Event{ - join("@alice:example.com") - | event_id: "alice joins", - prev_events: [create], - auth_events: [create] - } - - pl = %Event{ - set_power_levels("@alice:example.com", %{"@alice:example.com" => 100}) - | event_id: "power level", - prev_events: [alice_joins], - auth_events: [alice_joins, create] - } - - join_rules = %Event{ - new() - | event_id: "join rules", - event_type: :join_rules, - sender: "@alice:example.com", - content: "private", - prev_events: [pl], - auth_events: [pl, alice_joins, create] - } - - invite_bob = %Event{ - invite("@alice:example.com", "@bob:example.com") - | event_id: "invite bob", - prev_events: [join_rules], - auth_events: [pl, alice_joins, create] - } - - invite_carol = %Event{ - invite("@alice:example.com", "@carol:example.com") - | event_id: "invite carol", - prev_events: [invite_bob], - auth_events: [pl, alice_joins, create] - } - - bob_join = %Event{ - join("@bob:example.com") - | event_id: "bob joins", - prev_events: [invite_carol], - auth_events: [invite_bob, join_rules, create] - } - - [create, alice_joins, pl, join_rules, invite_bob, invite_carol, bob_join] + def get_power_levels(state_set) when is_map_key(state_set, {"m.room.power_levels", ""}) do + Repo.one!( + from e in Event, + where: e.event_id == ^state_set[{"m.room.power_levels", ""}], + select: e.content + ) end - def example2 do - create = %Event{ - new_state_event() - | event_id: "create", - event_type: :create, - sender: "@alice:example.com" - } + def get_power_levels(_), do: nil - alice_joins = join("@alice:example.com") - - pl1 = %Event{ - set_power_levels("@alice:example.com", %{"@alice:example.com" => 100}) - | event_id: "power levels 1" - } - - pl2 = %Event{ - set_power_levels("@alice:example.com", %{ - "@alice:example.com" => 100, - "@bob:example.com" => 50 - }) - | event_id: "power levels 2" - } - - topic = %Event{set_topic("@alice:example.com", "This is a topic") | event_id: "topic"} - - state_set1 = get_state_set_from_event_list([create, alice_joins, pl1]) - state_set2 = get_state_set_from_event_list([create, alice_joins, pl2, topic]) - state_set3 = get_state_set_from_event_list([create, alice_joins, pl2]) - [state_set1, state_set2, state_set3] + def has_power_level(user, %{"users" => users}, level) do + Map.get(users, user, 0) >= level end - def example3 do - pl1 = %Event{set_power_levels("alice", %{}) | event_id: "pl1", timestamp: 1} - - pl2 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl2", - auth_events: [pl1], - timestamp: 2 - } - - pl3 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl3", - auth_events: [pl1], - timestamp: 4 - } - - pl4 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl4", - auth_events: [pl2], - timestamp: 6 - } - - pl5 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl5", - auth_events: [pl4], - timestamp: 6 - } - - pl6 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl6", - auth_events: [pl4], - timestamp: 5 - } - - pl7 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl7", - auth_events: [pl2], - timestamp: 5 - } - - pl8 = %Event{ - set_power_levels("alice", %{}) - | event_id: "pl8", - auth_events: [pl7], - timestamp: 6 - } - - [pl1, pl2, pl3, pl4, pl5, pl6, pl7, pl8] - end - - defp membership(user), do: membership(user, user) - - defp membership(actor, subject) do - %Event{new() | event_type: :membership, sender: actor, state_key: subject} + def has_power_level(_, _, _) do + true end defp get_event_power_level(%Event{state_key: ""}), do: 0 defp get_event_power_level(_), do: 50 + + # No join rules specified, allow joining for room creator only. + def allowed_to_join(user, state_set) + when not is_map_key(state_set, {"m.room.join_rules", ""}) do + Repo.one!( + from e in Event, where: e.event_id == ^state_set[{"m.room.create", ""}], select: e.sender + ) == user + end end diff --git a/lib/matrix_server/state_resolution_example.ex b/lib/matrix_server/state_resolution_example.ex new file mode 100644 index 0000000..bef12c3 --- /dev/null +++ b/lib/matrix_server/state_resolution_example.ex @@ -0,0 +1,469 @@ +# https://matrix.uhoreg.ca/stateres/reloaded.html +defmodule MatrixServer.StateResolutionExample do + @derive {Inspect, except: [:prev_events, :auth_events]} + defstruct [ + :event_id, + :event_type, + :timestamp, + :state_key, + :sender, + :content, + :prev_events, + :auth_events, + :power_levels + ] + + alias __MODULE__, as: Event + + @type t :: %Event{event_id: String.t(), event_type: Atom.t(), timestamp: Integer.t()} + + def new_state_event, do: %Event{new() | event_type: :state} + def new_message_event, do: %Event{new() | event_type: :message} + + # TODO: remove state_key default here + def new do + %Event{ + event_id: "", + timestamp: 0, + state_key: "", + sender: "", + content: "", + prev_events: [], + auth_events: [], + power_levels: %{} + } + end + + def join(user), do: %Event{membership(user) | content: "join"} + def leave(user), do: %Event{membership(user) | content: "leave"} + def invite(actor, subject), do: %Event{membership(actor, subject) | content: "invite"} + def kick(actor, subject), do: %Event{membership(actor, subject) | content: "leave"} + def ban(actor, subject), do: %Event{membership(actor, subject) | content: "ban"} + + def set_power_levels(user, power_levels) do + %Event{new() | event_type: :power_levels, sender: user, power_levels: power_levels} + end + + def set_topic(user, topic) do + %Event{new() | event_type: :topic, sender: user, content: topic} + end + + def get_state_set_from_event_list(events) do + Enum.reduce(events, %{}, fn + %Event{event_type: event_type, state_key: state_key} = event, acc -> + Map.put(acc, {event_type, state_key}, event) + end) + end + + def auth_chain(event), do: auth_chain(event, MapSet.new()) + + def auth_chain(%Event{auth_events: auth_events}, set) do + Enum.reduce(auth_events, set, fn event, acc -> + event + |> auth_chain() + |> MapSet.union(acc) + |> MapSet.put(event) + end) + end + + def in_room(user, state_set) when is_map_key(state_set, {:membership, user}) do + state_set[{:membership, user}].content == "join" + end + + def in_room(_, _), do: false + + def get_power_levels(state_set) when is_map_key(state_set, {:power_levels, ""}) do + state_set[{:power_levels, ""}].power_levels + end + + def get_power_levels(_), do: nil + + def has_power_level(_, nil, _), do: true + + def has_power_level(user, power_levels, level) do + Map.get(power_levels, user, 0) >= level + end + + # No join rules specified, allow joining for room creator only. + def allowed_to_join(user, state_set) when not is_map_key(state_set, {:join_rules, ""}) do + state_set[{:create, ""}].sender == user + end + + # TODO: join and power levels events + def is_authorized(%Event{event_type: :create, prev_events: prev_events}, _), + do: prev_events == [] + + def is_authorized(%Event{event_type: :membership, content: "join", state_key: user}, state_set) do + allowed_to_join(user, state_set) + end + + def is_authorized(%Event{sender: sender} = event, state_set) do + in_room(sender, state_set) and + has_power_level(sender, get_power_levels(state_set), get_event_power_level(event)) + end + + def is_authorized2(%Event{auth_events: auth_events} = event, state_set) do + state_set = + Enum.reduce(auth_events, state_set, fn %Event{event_type: event_type, state_key: state_key} = + event, + acc -> + Map.put_new(acc, {event_type, state_key}, event) + end) + + is_authorized(event, state_set) + end + + def iterative_auth_checks(events, state_set) do + Enum.reduce(events, state_set, fn event, acc -> + if is_authorized2(event, acc), do: insert_event(event, acc), else: acc + end) + end + + def insert_event(%Event{event_type: event_type, state_key: state_key} = event, state_set) do + Map.put(state_set, {event_type, state_key}, event) + end + + def is_control_event(%Event{event_type: :power_levels, state_key: ""}), do: true + + def is_control_event(%Event{event_type: :join_rules, state_key: ""}), do: true + + def is_control_event(%Event{ + event_type: :membership, + state_key: state_key, + sender: sender, + content: "ban" + }), + do: sender != state_key + + def is_control_event(%Event{ + event_type: :membership, + state_key: state_key, + sender: sender, + content: "leave" + }), + do: sender != state_key + + def is_control_event(_), do: false + + def calculate_conflict(state_sets) do + domain = + state_sets + |> Enum.map(&Map.keys/1) + |> List.flatten() + |> MapSet.new() + + full_state_map_list = + Enum.map(domain, fn k -> + events = + Enum.map(state_sets, &Map.get(&1, k)) + |> MapSet.new() + + {k, events} + end) + + {unconflicted, conflicted} = + Enum.split_with(full_state_map_list, fn {_k, events} -> + MapSet.size(events) == 1 + end) + + unconflicted_state_map = + Enum.map(unconflicted, fn {k, events} -> + event = + events + |> MapSet.to_list() + |> hd() + + {k, event} + end) + |> Enum.into(%{}) + + conflicted_state_map = + Enum.flat_map(conflicted, fn {_, events} -> + events + |> MapSet.delete(nil) + |> MapSet.to_list() + end) + |> MapSet.new() + + {unconflicted_state_map, conflicted_state_map} + end + + def full_auth_chain(events) do + events + |> Enum.map(&auth_chain/1) + |> Enum.reduce(MapSet.new(), &MapSet.union/2) + end + + def auth_difference(state_sets) do + full_auth_chains = + Enum.map(state_sets, fn state_set -> + state_set + |> Map.values() + |> full_auth_chain() + end) + + auth_chain_union = Enum.reduce(full_auth_chains, MapSet.new(), &MapSet.union/2) + auth_chain_intersection = Enum.reduce(full_auth_chains, MapSet.new(), &MapSet.intersection/2) + + MapSet.difference(auth_chain_union, auth_chain_intersection) + end + + def rev_top_pow_order( + %Event{timestamp: timestamp1, event_id: event_id1} = event1, + %Event{timestamp: timestamp2, event_id: event_id2} = event2 + ) do + {power1, power2} = {get_power_level(event1), get_power_level(event2)} + + if power1 == power2 do + if timestamp1 == timestamp2 do + event_id1 <= event_id2 + else + timestamp1 < timestamp2 + end + else + power1 < power2 + end + end + + def get_power_level(%Event{sender: sender, auth_events: auth_events}) do + pl_event = Enum.find(auth_events, &(&1.event_type == :power_levels)) + + case pl_event do + %Event{power_levels: power_levels} -> Map.get(power_levels, sender, 0) + _ -> 0 + end + end + + def mainline(event) do + event + |> mainline([]) + |> Enum.reverse() + end + + def mainline(%Event{auth_events: auth_events} = event, acc) do + case Enum.find(auth_events, &(&1.event_type == :power_levels)) do + nil -> [event | acc] + pl_event -> mainline(pl_event, [event | acc]) + end + end + + def mainline_order(p) do + mainline_map = + p + |> mainline() + |> Enum.with_index() + |> Enum.into(%{}) + + fn %Event{timestamp: timestamp1, event_id: event_id1} = event1, + %Event{timestamp: timestamp2, event_id: event_id2} = event2 -> + mainline_depth1 = get_mainline_depth(mainline_map, event1) + mainline_depth2 = get_mainline_depth(mainline_map, event2) + + if mainline_depth1 == mainline_depth2 do + if timestamp1 == timestamp2 do + event_id1 <= event_id2 + else + timestamp1 < timestamp2 + end + else + mainline_depth1 < mainline_depth2 + end + end + end + + defp get_mainline_depth(mainline_map, event) do + mainline = mainline(event) + + result = + Enum.find_value(mainline, fn mainline_event -> + if Map.has_key?(mainline_map, mainline_event) do + {:ok, mainline_map[mainline_event]} + else + nil + end + end) + + case result do + {:ok, index} -> -index + nil -> nil + end + end + + def resolve(state_sets) do + {unconflicted_state_map, conflicted_set} = calculate_conflict(state_sets) + full_conflicted_set = MapSet.union(conflicted_set, auth_difference(state_sets)) + + conflicted_control_events = + Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new() + + conflicted_control_events_with_auth = + MapSet.union( + conflicted_control_events, + MapSet.intersection( + full_conflicted_set, + full_auth_chain(MapSet.to_list(conflicted_control_events)) + ) + ) + + sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2) + partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map) + + other_conflicted_events = + MapSet.difference(full_conflicted_set, conflicted_control_events_with_auth) + + resolved_power_levels = partial_resolved_state[{:power_levels, ""}] + + sorted_other_events = + Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels)) + + nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state) + + Map.merge(nearly_final_state, unconflicted_state_map) + end + + def example1 do + create = %Event{new() | event_id: "create", event_type: :create, sender: "@alice:example.com"} + + alice_joins = %Event{ + join("@alice:example.com") + | event_id: "alice joins", + prev_events: [create], + auth_events: [create] + } + + pl = %Event{ + set_power_levels("@alice:example.com", %{"@alice:example.com" => 100}) + | event_id: "power level", + prev_events: [alice_joins], + auth_events: [alice_joins, create] + } + + join_rules = %Event{ + new() + | event_id: "join rules", + event_type: :join_rules, + sender: "@alice:example.com", + content: "private", + prev_events: [pl], + auth_events: [pl, alice_joins, create] + } + + invite_bob = %Event{ + invite("@alice:example.com", "@bob:example.com") + | event_id: "invite bob", + prev_events: [join_rules], + auth_events: [pl, alice_joins, create] + } + + invite_carol = %Event{ + invite("@alice:example.com", "@carol:example.com") + | event_id: "invite carol", + prev_events: [invite_bob], + auth_events: [pl, alice_joins, create] + } + + bob_join = %Event{ + join("@bob:example.com") + | event_id: "bob joins", + prev_events: [invite_carol], + auth_events: [invite_bob, join_rules, create] + } + + [create, alice_joins, pl, join_rules, invite_bob, invite_carol, bob_join] + end + + def example2 do + create = %Event{ + new_state_event() + | event_id: "create", + event_type: :create, + sender: "@alice:example.com" + } + + alice_joins = join("@alice:example.com") + + pl1 = %Event{ + set_power_levels("@alice:example.com", %{"@alice:example.com" => 100}) + | event_id: "power levels 1" + } + + pl2 = %Event{ + set_power_levels("@alice:example.com", %{ + "@alice:example.com" => 100, + "@bob:example.com" => 50 + }) + | event_id: "power levels 2" + } + + topic = %Event{set_topic("@alice:example.com", "This is a topic") | event_id: "topic"} + + state_set1 = get_state_set_from_event_list([create, alice_joins, pl1]) + state_set2 = get_state_set_from_event_list([create, alice_joins, pl2, topic]) + state_set3 = get_state_set_from_event_list([create, alice_joins, pl2]) + [state_set1, state_set2, state_set3] + end + + def example3 do + pl1 = %Event{set_power_levels("alice", %{}) | event_id: "pl1", timestamp: 1} + + pl2 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl2", + auth_events: [pl1], + timestamp: 2 + } + + pl3 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl3", + auth_events: [pl1], + timestamp: 4 + } + + pl4 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl4", + auth_events: [pl2], + timestamp: 6 + } + + pl5 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl5", + auth_events: [pl4], + timestamp: 6 + } + + pl6 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl6", + auth_events: [pl4], + timestamp: 5 + } + + pl7 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl7", + auth_events: [pl2], + timestamp: 5 + } + + pl8 = %Event{ + set_power_levels("alice", %{}) + | event_id: "pl8", + auth_events: [pl7], + timestamp: 6 + } + + [pl1, pl2, pl3, pl4, pl5, pl6, pl7, pl8] + end + + defp membership(user), do: membership(user, user) + + defp membership(actor, subject) do + %Event{new() | event_type: :membership, sender: actor, state_key: subject} + end + + defp get_event_power_level(%Event{state_key: ""}), do: 0 + defp get_event_power_level(_), do: 50 +end diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index fa044af..5210db4 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -1,11 +1,73 @@ -# Script for populating the database. You can run it as: -# -# mix run priv/repo/seeds.exs -# -# Inside the script, you can read and write to any of your -# repositories directly: -# -# MatrixServer.Repo.insert!(%MatrixServer.SomeSchema{}) -# -# We recommend using the bang functions (`insert!`, `update!` -# and so on) as they will fail if something goes wrong. +alias MatrixServer.{Repo, Room, Event} + +# Auth difference example from here: +# https://matrix.org/docs/guides/implementing-stateres#auth-differences + +Repo.insert!(%Room{ + id: "room1", + visibility: :public +}) + +Repo.insert!( + Event.create_room("room1", "alice", "v1") + |> Map.put(:origin_server_ts, 0) + |> Map.put(:event_id, "create") +) + +Repo.insert!( + Event.join("room1", "alice") + |> Map.put(:prev_events, ["create"]) + |> Map.put(:auth_events, ["create"]) + |> Map.put(:origin_server_ts, 1) + |> Map.put(:event_id, "join_alice") +) + +Repo.insert!( + Event.join("room1", "bob") + |> Map.put(:prev_events, ["join_alice"]) + |> Map.put(:auth_events, ["create"]) + |> Map.put(:origin_server_ts, 2) + |> Map.put(:event_id, "join_bob") +) + +Repo.insert!( + Event.join("room1", "charlie") + |> Map.put(:prev_events, ["join_bob"]) + |> Map.put(:auth_events, ["create"]) + |> Map.put(:origin_server_ts, 3) + |> Map.put(:event_id, "join_charlie") +) + +%Event{content: content} = event = Event.power_levels("room1", "alice") +event = %Event{event | content: %{content | "users" => %{"alice" => 100, "bob" => 100}}} + +Repo.insert!( + event + |> Map.put(:prev_events, ["join_alice"]) + |> Map.put(:auth_events, ["create", "join_alice"]) + |> Map.put(:origin_server_ts, 4) + |> Map.put(:event_id, "a") +) + +%Event{content: content} = event = Event.power_levels("room1", "bob") + +event = %Event{ + event + | content: %{content | "users" => %{"alice" => 100, "bob" => 100, "charlie" => 100}} +} + +Repo.insert!( + event + |> Map.put(:prev_events, ["a"]) + |> Map.put(:auth_events, ["create", "join_bob", "a"]) + |> Map.put(:origin_server_ts, 5) + |> Map.put(:event_id, "b") +) + +Repo.insert!( + Event.room_topic("room1", "alice", "sneed") + |> Map.put(:prev_events, ["a"]) + |> Map.put(:auth_events, ["create", "join_alice", "a"]) + |> Map.put(:origin_server_ts, 5) + |> Map.put(:event_id, "fork") +)