Add working implementation of state resolution

Add some database seeds
This commit is contained in:
Pim Kunis 2021-07-23 21:00:01 +02:00
parent 2d34f78a0b
commit 8a2ef0d079
7 changed files with 848 additions and 410 deletions

View file

@ -11,15 +11,11 @@ defmodule MatrixServer.Application do
MatrixServerWeb.Telemetry, MatrixServerWeb.Telemetry,
{Phoenix.PubSub, name: MatrixServer.PubSub}, {Phoenix.PubSub, name: MatrixServer.PubSub},
MatrixServerWeb.Endpoint, MatrixServerWeb.Endpoint,
MatrixServer.RoomServer {Registry, keys: :unique, name: MatrixServer.RoomServer.Registry},
# Start a worker by calling: MatrixServer.Worker.start_link(arg) {DynamicSupervisor, name: MatrixServer.RoomServer.Supervisor, strategy: :one_for_one}
# {MatrixServer.Worker, arg}
] ]
# See https://hexdocs.pm/elixir/Supervisor.html Supervisor.start_link(children, name: MatrixServer.Supervisor, strategy: :one_for_one)
# for other strategies and supported options
opts = [strategy: :one_for_one, name: MatrixServer.Supervisor]
Supervisor.start_link(children, opts)
end end
# Tell Phoenix to update the endpoint configuration # Tell Phoenix to update the endpoint configuration

View file

@ -54,7 +54,7 @@ defmodule MatrixServer.Event do
| type: "m.room.member", | type: "m.room.member",
state_key: sender, state_key: sender,
content: %{ content: %{
"membership" => "invite" "membership" => "join"
} }
} }
end end
@ -111,9 +111,10 @@ defmodule MatrixServer.Event do
room: %Room{id: room_id} room: %Room{id: room_id}
}) do }) do
# TODO: state resolution # TODO: state resolution
create_room_event = create_room(room_id, MatrixServer.get_mxid(localpart), room_version) create_room(room_id, MatrixServer.get_mxid(localpart), room_version)
resolve([events_to_state_set([create_room_event])]) # resolve([events_to_state_set([create_room_event])])
repo.insert(create_room_event) # MatrixServer.StateResolution.resolve(create_room_event)
# repo.insert(create_room_event)
end end
def room_creation_join_creator(repo, %{ def room_creation_join_creator(repo, %{
@ -244,16 +245,18 @@ defmodule MatrixServer.Event do
MapSet.size(events) == 1 MapSet.size(events) == 1
end) end)
unconflicted_state_map = Enum.into(unconflicted, %{}, fn {state_pair, events} -> unconflicted_state_map =
event = MapSet.to_list(events) |> hd() Enum.into(unconflicted, %{}, fn {state_pair, events} ->
event = MapSet.to_list(events) |> hd()
{state_pair, event} {state_pair, event}
end) end)
conflicted_state_set = Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc -> conflicted_state_set =
MapSet.union(acc, events) Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc ->
end) MapSet.union(acc, events)
|> MapSet.delete(nil) end)
|> MapSet.delete(nil)
{unconflicted_state_map, conflicted_state_set} {unconflicted_state_map, conflicted_state_set}
end end

View file

@ -3,12 +3,13 @@ defmodule MatrixServer.Room do
import Ecto.Changeset import Ecto.Changeset
alias __MODULE__ alias MatrixServer.{Room, Event}
alias MatrixServerWeb.API.CreateRoom alias MatrixServerWeb.API.CreateRoom
@primary_key {:id, :string, []} @primary_key {:id, :string, []}
schema "rooms" do schema "rooms" do
field :visibility, Ecto.Enum, values: [:public, :private] field :visibility, Ecto.Enum, values: [:public, :private]
has_many :events, Event, foreign_key: :event_id
end end
def changeset(room, params \\ %{}) do def changeset(room, params \\ %{}) do

View file

@ -1,41 +1,76 @@
defmodule MatrixServer.RoomServer do defmodule MatrixServer.RoomServer do
use GenServer use GenServer
alias MatrixServer.{Repo, Room, Event} alias MatrixServer.{Repo, Room, Event, Account}
alias MatrixServerWeb.API.CreateRoom alias MatrixServerWeb.API.CreateRoom
alias Ecto.Multi alias Ecto.Multi
def start_link(_opts) do @registry MatrixServer.RoomServer.Registry
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) @supervisor MatrixServer.RoomServer.Supervisor
def get_room_server(room_id) do
case Registry.lookup(@registry, room_id) do
[{pid, _}] ->
{:ok, pid}
[] ->
opts = [
room_id: room_id,
name: {:via, Registry, {@registry, room_id}}
]
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
end
end end
def create_room(%CreateRoom{} = input, account) do def start_link(opts) do
GenServer.call(__MODULE__, {:create_room, input, account}) {name, opts} = Keyword.pop(opts, :name)
GenServer.start_link(__MODULE__, opts, name: name)
end
def create_room(input, account) do
Multi.new()
|> Multi.insert(:room, Room.create_changeset(input))
|> Multi.run(:room_server, fn _repo, %{room: %Room{id: room_id} = room} ->
opts = [
name: {:via, Registry, {@registry, room_id}},
input: input,
account: account,
room: room
]
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
end)
|> Repo.transaction()
end end
@impl true @impl true
def init(:ok) do def init(opts) do
{:ok, %{}} %Room{id: room_id} = Keyword.fetch!(opts, :room)
input = Keyword.fetch!(opts, :input)
account = Keyword.fetch!(opts, :account)
state = %{
room_id: room_id,
state_set: %{}
}
Repo.transaction(fn ->
with {:ok, create_room_event} <- insert_create_room_event(account, input, state) do
{:ok, state}
end
end)
{:ok, state}
end end
@impl true defp insert_create_room_event(
def handle_call({:create_room, input, account}, _from, state) do %Account{localpart: localpart},
# TODO: preset events, initial_state events, invite, invite_3pid %CreateRoom{room_version: room_version},
result = %{room_id: room_id, state_set: state_set}
Multi.new() ) do
|> Multi.put(:input, input) create_room_event = Event.create_room(room_id, MatrixServer.get_mxid(localpart), room_version)
|> Multi.put(:account, account) MatrixServer.StateResolution.resolve(create_room_event)
|> Multi.insert(:room, Room.create_changeset(input)) {:ok, create_room_event}
|> Multi.run(:create_room_event, &Event.room_creation_create_room/2)
|> Multi.run(:join_creator_event, &Event.room_creation_join_creator/2)
|> Multi.run(:power_levels_event, &Event.room_creation_power_levels/2)
|> Multi.run(:name_event, &Event.room_creation_name/2)
|> Multi.run(:topic_event, &Event.room_creation_topic/2)
|> Multi.run(:temp, fn _, _ ->
{:error, :lol}
end)
|> Repo.transaction()
{:reply, result, state}
end end
end end

View file

@ -1,200 +1,108 @@
# https://matrix.uhoreg.ca/stateres/reloaded.html
defmodule MatrixServer.StateResolution do defmodule MatrixServer.StateResolution do
@derive {Inspect, except: [:prev_events, :auth_events]} import Ecto.Query
defstruct [
:event_id,
:event_type,
:timestamp,
:state_key,
:sender,
:content,
:prev_events,
:auth_events,
:power_levels
]
alias __MODULE__, as: Event alias MatrixServer.{Repo, Event}
@type t :: %Event{event_id: String.t(), event_type: Atom.t(), timestamp: Integer.t()} def example do
%Event{content: content} = event = Event.power_levels("room1", "charlie")
event = %Event{event | content: %{content | "ban" => 0}}
def new_state_event, do: %Event{new() | event_type: :state} event
def new_message_event, do: %Event{new() | event_type: :message} |> Map.put(:prev_events, ["b", "fork"])
|> Map.put(:auth_events, ["create", "join_charlie", "b"])
# TODO: remove state_key default here
def new do
%Event{
event_id: "",
timestamp: 0,
state_key: "",
sender: "",
content: "",
prev_events: [],
auth_events: [],
power_levels: %{}
}
end end
def join(user), do: %Event{membership(user) | content: "join"} def resolve(%Event{type: type, state_key: state_key, event_id: event_id, prev_events: prev_event_ids}) do
def leave(user), do: %Event{membership(user) | content: "leave"} state_sets = Event
def invite(actor, subject), do: %Event{membership(actor, subject) | content: "invite"} |> where([e], e.event_id in ^prev_event_ids)
def kick(actor, subject), do: %Event{membership(actor, subject) | content: "leave"} |> Repo.all()
def ban(actor, subject), do: %Event{membership(actor, subject) | content: "ban"} |> Enum.map(&resolve/1)
def set_power_levels(user, power_levels) do resolved_state = resolve(state_sets)
%Event{new() | event_type: :power_levels, sender: user, power_levels: power_levels} # TODO: check if state event
Map.put(resolved_state, {type, state_key}, event_id)
end end
def set_topic(user, topic) do def resolve([]), do: %{}
%Event{new() | event_type: :topic, sender: user, content: topic}
def resolve(state_sets) do
{unconflicted_state_map, conflicted_state_set} = calculate_conflict(state_sets)
if MapSet.size(conflicted_state_set) == 0 do
unconflicted_state_map
else
full_conflicted_set = MapSet.union(conflicted_state_set, auth_difference(state_sets))
conflicted_control_event_ids =
Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new()
conflicted_control_event_ids_with_auth =
conflicted_control_event_ids
|> MapSet.to_list()
|> full_auth_chain()
|> MapSet.intersection(full_conflicted_set)
|> MapSet.union(conflicted_control_event_ids)
conflicted_control_events_with_auth =
Repo.all(
from e in Event,
where: e.event_id in ^MapSet.to_list(conflicted_control_event_ids_with_auth)
)
sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2)
partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map)
other_conflicted_event_ids =
MapSet.difference(full_conflicted_set, conflicted_control_event_ids_with_auth)
other_conflicted_events =
Repo.all(from e in Event, where: e.event_id in ^MapSet.to_list(other_conflicted_event_ids))
resolved_power_levels = partial_resolved_state[{"m.room.power_levels", ""}]
sorted_other_events =
Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels))
nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state)
Map.merge(nearly_final_state, unconflicted_state_map)
end
end end
def get_state_set_from_event_list(events) do
Enum.reduce(events, %{}, fn
%Event{event_type: event_type, state_key: state_key} = event, acc ->
Map.put(acc, {event_type, state_key}, event)
end)
end
def auth_chain(event), do: auth_chain(event, MapSet.new())
def auth_chain(%Event{auth_events: auth_events}, set) do
Enum.reduce(auth_events, set, fn event, acc ->
event
|> auth_chain()
|> MapSet.union(acc)
|> MapSet.put(event)
end)
end
def in_room(user, state_set) when is_map_key(state_set, {:membership, user}) do
state_set[{:membership, user}].content == "join"
end
def in_room(_, _), do: false
def get_power_levels(state_set) when is_map_key(state_set, {:power_levels, ""}) do
state_set[{:power_levels, ""}].power_levels
end
def get_power_levels(_), do: nil
def has_power_level(_, nil, _), do: true
def has_power_level(user, power_levels, level) do
Map.get(power_levels, user, 0) >= level
end
# No join rules specified, allow joining for room creator only.
def allowed_to_join(user, state_set) when not is_map_key(state_set, {:join_rules, ""}) do
state_set[{:create, ""}].sender == user
end
# TODO: join and power levels events
def is_authorized(%Event{event_type: :create, prev_events: prev_events}, _),
do: prev_events == []
def is_authorized(%Event{event_type: :membership, content: "join", state_key: user}, state_set) do
allowed_to_join(user, state_set)
end
def is_authorized(%Event{sender: sender} = event, state_set) do
in_room(sender, state_set) and
has_power_level(sender, get_power_levels(state_set), get_event_power_level(event))
end
def is_authorized2(%Event{auth_events: auth_events} = event, state_set) do
state_set =
Enum.reduce(auth_events, state_set, fn %Event{event_type: event_type, state_key: state_key} =
event,
acc ->
Map.put_new(acc, {event_type, state_key}, event)
end)
is_authorized(event, state_set)
end
def iterative_auth_checks(events, state_set) do
Enum.reduce(events, state_set, fn event, acc ->
if is_authorized2(event, acc), do: insert_event(event, acc), else: acc
end)
end
def insert_event(%Event{event_type: event_type, state_key: state_key} = event, state_set) do
Map.put(state_set, {event_type, state_key}, event)
end
def is_control_event(%Event{event_type: :power_levels, state_key: ""}), do: true
def is_control_event(%Event{event_type: :join_rules, state_key: ""}), do: true
def is_control_event(%Event{
event_type: :membership,
state_key: state_key,
sender: sender,
content: "ban"
}),
do: sender != state_key
def is_control_event(%Event{
event_type: :membership,
state_key: state_key,
sender: sender,
content: "leave"
}),
do: sender != state_key
def is_control_event(_), do: false
def calculate_conflict(state_sets) do def calculate_conflict(state_sets) do
domain = {unconflicted, conflicted} =
state_sets state_sets
|> Enum.map(&Map.keys/1) |> Enum.flat_map(&Map.keys/1)
|> List.flatten()
|> MapSet.new() |> MapSet.new()
|> Enum.into(%{}, fn state_pair ->
full_state_map_list =
Enum.map(domain, fn k ->
events = events =
Enum.map(state_sets, &Map.get(&1, k)) Enum.map(state_sets, &Map.get(&1, state_pair))
|> MapSet.new() |> MapSet.new()
{k, events} {state_pair, events}
end) end)
|> Enum.split_with(fn {_, events} ->
{unconflicted, conflicted} =
Enum.split_with(full_state_map_list, fn {_k, events} ->
MapSet.size(events) == 1 MapSet.size(events) == 1
end) end)
unconflicted_state_map = unconflicted_state_map =
Enum.map(unconflicted, fn {k, events} -> Enum.into(unconflicted, %{}, fn {state_pair, events} ->
event = event = MapSet.to_list(events) |> hd()
events
|> MapSet.to_list()
|> hd()
{k, event} {state_pair, event}
end) end)
|> Enum.into(%{})
conflicted_state_map = conflicted_state_set =
Enum.flat_map(conflicted, fn {_, events} -> Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc ->
events MapSet.union(acc, events)
|> MapSet.delete(nil)
|> MapSet.to_list()
end) end)
|> MapSet.new() |> MapSet.delete(nil)
{unconflicted_state_map, conflicted_state_map} {unconflicted_state_map, conflicted_state_set}
end
def full_auth_chain(events) do
events
|> Enum.map(&auth_chain/1)
|> Enum.reduce(MapSet.new(), &MapSet.union/2)
end end
def auth_difference(state_sets) do def auth_difference(state_sets) do
# TODO: memoization possible
full_auth_chains = full_auth_chains =
Enum.map(state_sets, fn state_set -> Enum.map(state_sets, fn state_set ->
state_set state_set
@ -208,9 +116,50 @@ defmodule MatrixServer.StateResolution do
MapSet.difference(auth_chain_union, auth_chain_intersection) MapSet.difference(auth_chain_union, auth_chain_intersection)
end end
def full_auth_chain(event_ids) do
event_ids
|> Enum.map(&auth_chain/1)
|> Enum.reduce(MapSet.new(), &MapSet.union/2)
end
def auth_chain(event_id) do
# TODO: handle when auth event is not found.
Event
|> where([e], e.event_id == ^event_id)
|> select([e], e.auth_events)
|> Repo.one!()
|> Enum.reduce(MapSet.new(), fn auth_event_id, acc ->
auth_event_id
|> auth_chain()
|> MapSet.union(acc)
|> MapSet.put(auth_event_id)
end)
end
def is_control_event(event_id) when is_binary(event_id) do
Event
|> where([e], e.event_id == ^event_id)
|> Repo.one!()
|> is_control_event()
end
def is_control_event(%Event{type: "m.room.power_levels", state_key: ""}), do: true
def is_control_event(%Event{type: "m.room.join_rules", state_key: ""}), do: true
def is_control_event(%Event{
type: "m.room.member",
state_key: state_key,
sender: sender,
content: %{membership: membership}
})
when sender != state_key and membership in ["leave", "ban"],
do: true
def is_control_event(_), do: false
def rev_top_pow_order( def rev_top_pow_order(
%Event{timestamp: timestamp1, event_id: event_id1} = event1, %Event{origin_server_ts: timestamp1, event_id: event_id1} = event1,
%Event{timestamp: timestamp2, event_id: event_id2} = event2 %Event{origin_server_ts: timestamp2, event_id: event_id2} = event2
) do ) do
{power1, power2} = {get_power_level(event1), get_power_level(event2)} {power1, power2} = {get_power_level(event1), get_power_level(event2)}
@ -225,37 +174,31 @@ defmodule MatrixServer.StateResolution do
end end
end end
def get_power_level(%Event{sender: sender, auth_events: auth_events}) do def get_power_level(%Event{sender: sender, auth_events: auth_event_ids}) do
pl_event = Enum.find(auth_events, &(&1.event_type == :power_levels)) pl_content =
Event
|> where([e], e.event_id in ^auth_event_ids)
|> where([e], e.type == "m.room.power_levels")
|> select([e], e.content)
|> Repo.one()
case pl_event do case pl_content do
%Event{power_levels: power_levels} -> Map.get(power_levels, sender, 0) %{"users" => pl_users} -> Map.get(pl_users, sender, 0)
_ -> 0 nil -> 0
end end
end end
def mainline(event) do def mainline_order(event_id) do
event
|> mainline([])
|> Enum.reverse()
end
def mainline(%Event{auth_events: auth_events} = event, acc) do
case Enum.find(auth_events, &(&1.event_type == :power_levels)) do
nil -> [event | acc]
pl_event -> mainline(pl_event, [event | acc])
end
end
def mainline_order(p) do
mainline_map = mainline_map =
p Event
|> where([e], e.event_id == ^event_id)
|> Repo.one!()
|> mainline() |> mainline()
|> Enum.with_index() |> Enum.with_index()
|> Enum.into(%{}) |> Enum.into(%{})
fn %Event{timestamp: timestamp1, event_id: event_id1} = event1, fn %Event{origin_server_ts: timestamp1, event_id: event_id1} = event1,
%Event{timestamp: timestamp2, event_id: event_id2} = event2 -> %Event{origin_server_ts: timestamp2, event_id: event_id2} = event2 ->
mainline_depth1 = get_mainline_depth(mainline_map, event1) mainline_depth1 = get_mainline_depth(mainline_map, event1)
mainline_depth2 = get_mainline_depth(mainline_map, event2) mainline_depth2 = get_mainline_depth(mainline_map, event2)
@ -289,181 +232,110 @@ defmodule MatrixServer.StateResolution do
end end
end end
def resolve(state_sets) do def mainline(event) do
{unconflicted_state_map, conflicted_set} = calculate_conflict(state_sets) event
full_conflicted_set = MapSet.union(conflicted_set, auth_difference(state_sets)) |> mainline([])
|> Enum.reverse()
end
conflicted_control_events = def mainline(%Event{auth_events: auth_event_ids} = event, acc) do
Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new() pl_event =
Event
|> where([e], e.event_id in ^auth_event_ids)
|> where([e], e.type == "m.room.power_levels")
|> Repo.one()
conflicted_control_events_with_auth = case pl_event do
MapSet.union( %Event{} -> mainline(pl_event, [event | acc])
conflicted_control_events, nil -> [event | acc]
MapSet.intersection( end
full_conflicted_set, end
full_auth_chain(MapSet.to_list(conflicted_control_events))
) def iterative_auth_checks(events, state_set) do
Enum.reduce(events, state_set, fn event, acc ->
if is_authorized2(event, acc), do: insert_event(event, acc), else: acc
end)
end
def insert_event(%Event{type: event_type, state_key: state_key, event_id: event_id}, state_set) do
Map.put(state_set, {event_type, state_key}, event_id)
end
def is_authorized2(%Event{auth_events: auth_event_ids} = event, state_set) do
state_set =
Event
|> where([e], e.event_id in ^auth_event_ids)
|> Repo.all()
|> Enum.reduce(state_set, fn %Event{
type: event_type,
state_key: state_key,
event_id: event_id
},
acc ->
Map.put_new(acc, {event_type, state_key}, event_id)
end)
is_authorized(event, state_set)
end
# TODO: join and power levels events
def is_authorized(%Event{type: "m.room.create", prev_events: prev_events}, _),
do: prev_events == []
def is_authorized(
%Event{type: "m.room.member", content: %{"membership" => "join"}, state_key: user},
state_set
) do
allowed_to_join(user, state_set)
end
def is_authorized(%Event{sender: sender} = event, state_set) do
in_room(sender, state_set) and
has_power_level(sender, get_power_levels(state_set), get_event_power_level(event))
end
def in_room(user, state_set) when is_map_key(state_set, {"m.room.member", user}) do
content =
Repo.one!(
from e in Event,
where: e.event_id == ^state_set[{"m.room.member", user}],
select: e.content
) )
sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2) case content["membership"] do
partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map) "join" -> true
_ -> false
other_conflicted_events = end
MapSet.difference(full_conflicted_set, conflicted_control_events_with_auth)
resolved_power_levels = partial_resolved_state[{:power_levels, ""}]
sorted_other_events =
Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels))
nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state)
Map.merge(nearly_final_state, unconflicted_state_map)
end end
def example1 do def in_room(_, _), do: false
create = %Event{new() | event_id: "create", event_type: :create, sender: "@alice:example.com"}
alice_joins = %Event{ def get_power_levels(state_set) when is_map_key(state_set, {"m.room.power_levels", ""}) do
join("@alice:example.com") Repo.one!(
| event_id: "alice joins", from e in Event,
prev_events: [create], where: e.event_id == ^state_set[{"m.room.power_levels", ""}],
auth_events: [create] select: e.content
} )
pl = %Event{
set_power_levels("@alice:example.com", %{"@alice:example.com" => 100})
| event_id: "power level",
prev_events: [alice_joins],
auth_events: [alice_joins, create]
}
join_rules = %Event{
new()
| event_id: "join rules",
event_type: :join_rules,
sender: "@alice:example.com",
content: "private",
prev_events: [pl],
auth_events: [pl, alice_joins, create]
}
invite_bob = %Event{
invite("@alice:example.com", "@bob:example.com")
| event_id: "invite bob",
prev_events: [join_rules],
auth_events: [pl, alice_joins, create]
}
invite_carol = %Event{
invite("@alice:example.com", "@carol:example.com")
| event_id: "invite carol",
prev_events: [invite_bob],
auth_events: [pl, alice_joins, create]
}
bob_join = %Event{
join("@bob:example.com")
| event_id: "bob joins",
prev_events: [invite_carol],
auth_events: [invite_bob, join_rules, create]
}
[create, alice_joins, pl, join_rules, invite_bob, invite_carol, bob_join]
end end
def example2 do def get_power_levels(_), do: nil
create = %Event{
new_state_event()
| event_id: "create",
event_type: :create,
sender: "@alice:example.com"
}
alice_joins = join("@alice:example.com") def has_power_level(user, %{"users" => users}, level) do
Map.get(users, user, 0) >= level
pl1 = %Event{
set_power_levels("@alice:example.com", %{"@alice:example.com" => 100})
| event_id: "power levels 1"
}
pl2 = %Event{
set_power_levels("@alice:example.com", %{
"@alice:example.com" => 100,
"@bob:example.com" => 50
})
| event_id: "power levels 2"
}
topic = %Event{set_topic("@alice:example.com", "This is a topic") | event_id: "topic"}
state_set1 = get_state_set_from_event_list([create, alice_joins, pl1])
state_set2 = get_state_set_from_event_list([create, alice_joins, pl2, topic])
state_set3 = get_state_set_from_event_list([create, alice_joins, pl2])
[state_set1, state_set2, state_set3]
end end
def example3 do def has_power_level(_, _, _) do
pl1 = %Event{set_power_levels("alice", %{}) | event_id: "pl1", timestamp: 1} true
pl2 = %Event{
set_power_levels("alice", %{})
| event_id: "pl2",
auth_events: [pl1],
timestamp: 2
}
pl3 = %Event{
set_power_levels("alice", %{})
| event_id: "pl3",
auth_events: [pl1],
timestamp: 4
}
pl4 = %Event{
set_power_levels("alice", %{})
| event_id: "pl4",
auth_events: [pl2],
timestamp: 6
}
pl5 = %Event{
set_power_levels("alice", %{})
| event_id: "pl5",
auth_events: [pl4],
timestamp: 6
}
pl6 = %Event{
set_power_levels("alice", %{})
| event_id: "pl6",
auth_events: [pl4],
timestamp: 5
}
pl7 = %Event{
set_power_levels("alice", %{})
| event_id: "pl7",
auth_events: [pl2],
timestamp: 5
}
pl8 = %Event{
set_power_levels("alice", %{})
| event_id: "pl8",
auth_events: [pl7],
timestamp: 6
}
[pl1, pl2, pl3, pl4, pl5, pl6, pl7, pl8]
end
defp membership(user), do: membership(user, user)
defp membership(actor, subject) do
%Event{new() | event_type: :membership, sender: actor, state_key: subject}
end end
defp get_event_power_level(%Event{state_key: ""}), do: 0 defp get_event_power_level(%Event{state_key: ""}), do: 0
defp get_event_power_level(_), do: 50 defp get_event_power_level(_), do: 50
# No join rules specified, allow joining for room creator only.
def allowed_to_join(user, state_set)
when not is_map_key(state_set, {"m.room.join_rules", ""}) do
Repo.one!(
from e in Event, where: e.event_id == ^state_set[{"m.room.create", ""}], select: e.sender
) == user
end
end end

View file

@ -0,0 +1,469 @@
# https://matrix.uhoreg.ca/stateres/reloaded.html
defmodule MatrixServer.StateResolutionExample do
@derive {Inspect, except: [:prev_events, :auth_events]}
defstruct [
:event_id,
:event_type,
:timestamp,
:state_key,
:sender,
:content,
:prev_events,
:auth_events,
:power_levels
]
alias __MODULE__, as: Event
@type t :: %Event{event_id: String.t(), event_type: Atom.t(), timestamp: Integer.t()}
def new_state_event, do: %Event{new() | event_type: :state}
def new_message_event, do: %Event{new() | event_type: :message}
# TODO: remove state_key default here
def new do
%Event{
event_id: "",
timestamp: 0,
state_key: "",
sender: "",
content: "",
prev_events: [],
auth_events: [],
power_levels: %{}
}
end
def join(user), do: %Event{membership(user) | content: "join"}
def leave(user), do: %Event{membership(user) | content: "leave"}
def invite(actor, subject), do: %Event{membership(actor, subject) | content: "invite"}
def kick(actor, subject), do: %Event{membership(actor, subject) | content: "leave"}
def ban(actor, subject), do: %Event{membership(actor, subject) | content: "ban"}
def set_power_levels(user, power_levels) do
%Event{new() | event_type: :power_levels, sender: user, power_levels: power_levels}
end
def set_topic(user, topic) do
%Event{new() | event_type: :topic, sender: user, content: topic}
end
def get_state_set_from_event_list(events) do
Enum.reduce(events, %{}, fn
%Event{event_type: event_type, state_key: state_key} = event, acc ->
Map.put(acc, {event_type, state_key}, event)
end)
end
def auth_chain(event), do: auth_chain(event, MapSet.new())
def auth_chain(%Event{auth_events: auth_events}, set) do
Enum.reduce(auth_events, set, fn event, acc ->
event
|> auth_chain()
|> MapSet.union(acc)
|> MapSet.put(event)
end)
end
def in_room(user, state_set) when is_map_key(state_set, {:membership, user}) do
state_set[{:membership, user}].content == "join"
end
def in_room(_, _), do: false
def get_power_levels(state_set) when is_map_key(state_set, {:power_levels, ""}) do
state_set[{:power_levels, ""}].power_levels
end
def get_power_levels(_), do: nil
def has_power_level(_, nil, _), do: true
def has_power_level(user, power_levels, level) do
Map.get(power_levels, user, 0) >= level
end
# No join rules specified, allow joining for room creator only.
def allowed_to_join(user, state_set) when not is_map_key(state_set, {:join_rules, ""}) do
state_set[{:create, ""}].sender == user
end
# TODO: join and power levels events
def is_authorized(%Event{event_type: :create, prev_events: prev_events}, _),
do: prev_events == []
def is_authorized(%Event{event_type: :membership, content: "join", state_key: user}, state_set) do
allowed_to_join(user, state_set)
end
def is_authorized(%Event{sender: sender} = event, state_set) do
in_room(sender, state_set) and
has_power_level(sender, get_power_levels(state_set), get_event_power_level(event))
end
def is_authorized2(%Event{auth_events: auth_events} = event, state_set) do
state_set =
Enum.reduce(auth_events, state_set, fn %Event{event_type: event_type, state_key: state_key} =
event,
acc ->
Map.put_new(acc, {event_type, state_key}, event)
end)
is_authorized(event, state_set)
end
def iterative_auth_checks(events, state_set) do
Enum.reduce(events, state_set, fn event, acc ->
if is_authorized2(event, acc), do: insert_event(event, acc), else: acc
end)
end
def insert_event(%Event{event_type: event_type, state_key: state_key} = event, state_set) do
Map.put(state_set, {event_type, state_key}, event)
end
def is_control_event(%Event{event_type: :power_levels, state_key: ""}), do: true
def is_control_event(%Event{event_type: :join_rules, state_key: ""}), do: true
def is_control_event(%Event{
event_type: :membership,
state_key: state_key,
sender: sender,
content: "ban"
}),
do: sender != state_key
def is_control_event(%Event{
event_type: :membership,
state_key: state_key,
sender: sender,
content: "leave"
}),
do: sender != state_key
def is_control_event(_), do: false
def calculate_conflict(state_sets) do
domain =
state_sets
|> Enum.map(&Map.keys/1)
|> List.flatten()
|> MapSet.new()
full_state_map_list =
Enum.map(domain, fn k ->
events =
Enum.map(state_sets, &Map.get(&1, k))
|> MapSet.new()
{k, events}
end)
{unconflicted, conflicted} =
Enum.split_with(full_state_map_list, fn {_k, events} ->
MapSet.size(events) == 1
end)
unconflicted_state_map =
Enum.map(unconflicted, fn {k, events} ->
event =
events
|> MapSet.to_list()
|> hd()
{k, event}
end)
|> Enum.into(%{})
conflicted_state_map =
Enum.flat_map(conflicted, fn {_, events} ->
events
|> MapSet.delete(nil)
|> MapSet.to_list()
end)
|> MapSet.new()
{unconflicted_state_map, conflicted_state_map}
end
def full_auth_chain(events) do
events
|> Enum.map(&auth_chain/1)
|> Enum.reduce(MapSet.new(), &MapSet.union/2)
end
def auth_difference(state_sets) do
full_auth_chains =
Enum.map(state_sets, fn state_set ->
state_set
|> Map.values()
|> full_auth_chain()
end)
auth_chain_union = Enum.reduce(full_auth_chains, MapSet.new(), &MapSet.union/2)
auth_chain_intersection = Enum.reduce(full_auth_chains, MapSet.new(), &MapSet.intersection/2)
MapSet.difference(auth_chain_union, auth_chain_intersection)
end
def rev_top_pow_order(
%Event{timestamp: timestamp1, event_id: event_id1} = event1,
%Event{timestamp: timestamp2, event_id: event_id2} = event2
) do
{power1, power2} = {get_power_level(event1), get_power_level(event2)}
if power1 == power2 do
if timestamp1 == timestamp2 do
event_id1 <= event_id2
else
timestamp1 < timestamp2
end
else
power1 < power2
end
end
def get_power_level(%Event{sender: sender, auth_events: auth_events}) do
pl_event = Enum.find(auth_events, &(&1.event_type == :power_levels))
case pl_event do
%Event{power_levels: power_levels} -> Map.get(power_levels, sender, 0)
_ -> 0
end
end
def mainline(event) do
event
|> mainline([])
|> Enum.reverse()
end
def mainline(%Event{auth_events: auth_events} = event, acc) do
case Enum.find(auth_events, &(&1.event_type == :power_levels)) do
nil -> [event | acc]
pl_event -> mainline(pl_event, [event | acc])
end
end
def mainline_order(p) do
mainline_map =
p
|> mainline()
|> Enum.with_index()
|> Enum.into(%{})
fn %Event{timestamp: timestamp1, event_id: event_id1} = event1,
%Event{timestamp: timestamp2, event_id: event_id2} = event2 ->
mainline_depth1 = get_mainline_depth(mainline_map, event1)
mainline_depth2 = get_mainline_depth(mainline_map, event2)
if mainline_depth1 == mainline_depth2 do
if timestamp1 == timestamp2 do
event_id1 <= event_id2
else
timestamp1 < timestamp2
end
else
mainline_depth1 < mainline_depth2
end
end
end
defp get_mainline_depth(mainline_map, event) do
mainline = mainline(event)
result =
Enum.find_value(mainline, fn mainline_event ->
if Map.has_key?(mainline_map, mainline_event) do
{:ok, mainline_map[mainline_event]}
else
nil
end
end)
case result do
{:ok, index} -> -index
nil -> nil
end
end
def resolve(state_sets) do
{unconflicted_state_map, conflicted_set} = calculate_conflict(state_sets)
full_conflicted_set = MapSet.union(conflicted_set, auth_difference(state_sets))
conflicted_control_events =
Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new()
conflicted_control_events_with_auth =
MapSet.union(
conflicted_control_events,
MapSet.intersection(
full_conflicted_set,
full_auth_chain(MapSet.to_list(conflicted_control_events))
)
)
sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2)
partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map)
other_conflicted_events =
MapSet.difference(full_conflicted_set, conflicted_control_events_with_auth)
resolved_power_levels = partial_resolved_state[{:power_levels, ""}]
sorted_other_events =
Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels))
nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state)
Map.merge(nearly_final_state, unconflicted_state_map)
end
def example1 do
create = %Event{new() | event_id: "create", event_type: :create, sender: "@alice:example.com"}
alice_joins = %Event{
join("@alice:example.com")
| event_id: "alice joins",
prev_events: [create],
auth_events: [create]
}
pl = %Event{
set_power_levels("@alice:example.com", %{"@alice:example.com" => 100})
| event_id: "power level",
prev_events: [alice_joins],
auth_events: [alice_joins, create]
}
join_rules = %Event{
new()
| event_id: "join rules",
event_type: :join_rules,
sender: "@alice:example.com",
content: "private",
prev_events: [pl],
auth_events: [pl, alice_joins, create]
}
invite_bob = %Event{
invite("@alice:example.com", "@bob:example.com")
| event_id: "invite bob",
prev_events: [join_rules],
auth_events: [pl, alice_joins, create]
}
invite_carol = %Event{
invite("@alice:example.com", "@carol:example.com")
| event_id: "invite carol",
prev_events: [invite_bob],
auth_events: [pl, alice_joins, create]
}
bob_join = %Event{
join("@bob:example.com")
| event_id: "bob joins",
prev_events: [invite_carol],
auth_events: [invite_bob, join_rules, create]
}
[create, alice_joins, pl, join_rules, invite_bob, invite_carol, bob_join]
end
def example2 do
create = %Event{
new_state_event()
| event_id: "create",
event_type: :create,
sender: "@alice:example.com"
}
alice_joins = join("@alice:example.com")
pl1 = %Event{
set_power_levels("@alice:example.com", %{"@alice:example.com" => 100})
| event_id: "power levels 1"
}
pl2 = %Event{
set_power_levels("@alice:example.com", %{
"@alice:example.com" => 100,
"@bob:example.com" => 50
})
| event_id: "power levels 2"
}
topic = %Event{set_topic("@alice:example.com", "This is a topic") | event_id: "topic"}
state_set1 = get_state_set_from_event_list([create, alice_joins, pl1])
state_set2 = get_state_set_from_event_list([create, alice_joins, pl2, topic])
state_set3 = get_state_set_from_event_list([create, alice_joins, pl2])
[state_set1, state_set2, state_set3]
end
def example3 do
pl1 = %Event{set_power_levels("alice", %{}) | event_id: "pl1", timestamp: 1}
pl2 = %Event{
set_power_levels("alice", %{})
| event_id: "pl2",
auth_events: [pl1],
timestamp: 2
}
pl3 = %Event{
set_power_levels("alice", %{})
| event_id: "pl3",
auth_events: [pl1],
timestamp: 4
}
pl4 = %Event{
set_power_levels("alice", %{})
| event_id: "pl4",
auth_events: [pl2],
timestamp: 6
}
pl5 = %Event{
set_power_levels("alice", %{})
| event_id: "pl5",
auth_events: [pl4],
timestamp: 6
}
pl6 = %Event{
set_power_levels("alice", %{})
| event_id: "pl6",
auth_events: [pl4],
timestamp: 5
}
pl7 = %Event{
set_power_levels("alice", %{})
| event_id: "pl7",
auth_events: [pl2],
timestamp: 5
}
pl8 = %Event{
set_power_levels("alice", %{})
| event_id: "pl8",
auth_events: [pl7],
timestamp: 6
}
[pl1, pl2, pl3, pl4, pl5, pl6, pl7, pl8]
end
defp membership(user), do: membership(user, user)
defp membership(actor, subject) do
%Event{new() | event_type: :membership, sender: actor, state_key: subject}
end
defp get_event_power_level(%Event{state_key: ""}), do: 0
defp get_event_power_level(_), do: 50
end

View file

@ -1,11 +1,73 @@
# Script for populating the database. You can run it as: alias MatrixServer.{Repo, Room, Event}
#
# mix run priv/repo/seeds.exs # Auth difference example from here:
# # https://matrix.org/docs/guides/implementing-stateres#auth-differences
# Inside the script, you can read and write to any of your
# repositories directly: Repo.insert!(%Room{
# id: "room1",
# MatrixServer.Repo.insert!(%MatrixServer.SomeSchema{}) visibility: :public
# })
# We recommend using the bang functions (`insert!`, `update!`
# and so on) as they will fail if something goes wrong. Repo.insert!(
Event.create_room("room1", "alice", "v1")
|> Map.put(:origin_server_ts, 0)
|> Map.put(:event_id, "create")
)
Repo.insert!(
Event.join("room1", "alice")
|> Map.put(:prev_events, ["create"])
|> Map.put(:auth_events, ["create"])
|> Map.put(:origin_server_ts, 1)
|> Map.put(:event_id, "join_alice")
)
Repo.insert!(
Event.join("room1", "bob")
|> Map.put(:prev_events, ["join_alice"])
|> Map.put(:auth_events, ["create"])
|> Map.put(:origin_server_ts, 2)
|> Map.put(:event_id, "join_bob")
)
Repo.insert!(
Event.join("room1", "charlie")
|> Map.put(:prev_events, ["join_bob"])
|> Map.put(:auth_events, ["create"])
|> Map.put(:origin_server_ts, 3)
|> Map.put(:event_id, "join_charlie")
)
%Event{content: content} = event = Event.power_levels("room1", "alice")
event = %Event{event | content: %{content | "users" => %{"alice" => 100, "bob" => 100}}}
Repo.insert!(
event
|> Map.put(:prev_events, ["join_alice"])
|> Map.put(:auth_events, ["create", "join_alice"])
|> Map.put(:origin_server_ts, 4)
|> Map.put(:event_id, "a")
)
%Event{content: content} = event = Event.power_levels("room1", "bob")
event = %Event{
event
| content: %{content | "users" => %{"alice" => 100, "bob" => 100, "charlie" => 100}}
}
Repo.insert!(
event
|> Map.put(:prev_events, ["a"])
|> Map.put(:auth_events, ["create", "join_bob", "a"])
|> Map.put(:origin_server_ts, 5)
|> Map.put(:event_id, "b")
)
Repo.insert!(
Event.room_topic("room1", "alice", "sneed")
|> Map.put(:prev_events, ["a"])
|> Map.put(:auth_events, ["create", "join_alice", "a"])
|> Map.put(:origin_server_ts, 5)
|> Map.put(:event_id, "fork")
)