Resolve state when inserting create room event
Cleanup code
This commit is contained in:
parent
135df49d1b
commit
b9156ed8b3
4 changed files with 83 additions and 242 deletions
|
@ -3,8 +3,7 @@ defmodule MatrixServer.Event do
|
||||||
|
|
||||||
import Ecto.Changeset
|
import Ecto.Changeset
|
||||||
|
|
||||||
alias MatrixServer.{Room, Event, Account}
|
alias MatrixServer.{Room, Event}
|
||||||
alias MatrixServerWeb.API.CreateRoom
|
|
||||||
|
|
||||||
@primary_key {:event_id, :string, []}
|
@primary_key {:event_id, :string, []}
|
||||||
schema "events" do
|
schema "events" do
|
||||||
|
@ -19,7 +18,6 @@ defmodule MatrixServer.Event do
|
||||||
end
|
end
|
||||||
|
|
||||||
def changeset(event, params \\ %{}) do
|
def changeset(event, params \\ %{}) do
|
||||||
# TODO: prev/auth events?
|
|
||||||
event
|
event
|
||||||
|> cast(params, [:type, :timestamp, :state_key, :sender, :content])
|
|> cast(params, [:type, :timestamp, :state_key, :sender, :content])
|
||||||
|> validate_required([:type, :timestamp, :sender])
|
|> validate_required([:type, :timestamp, :sender])
|
||||||
|
@ -105,159 +103,7 @@ defmodule MatrixServer.Event do
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
def room_creation_create_room(repo, %{
|
|
||||||
input: %CreateRoom{room_version: room_version},
|
|
||||||
account: %Account{localpart: localpart},
|
|
||||||
room: %Room{id: room_id}
|
|
||||||
}) do
|
|
||||||
# TODO: state resolution
|
|
||||||
create_room(room_id, MatrixServer.get_mxid(localpart), room_version)
|
|
||||||
# resolve([events_to_state_set([create_room_event])])
|
|
||||||
# MatrixServer.StateResolution.resolve(create_room_event)
|
|
||||||
# repo.insert(create_room_event)
|
|
||||||
end
|
|
||||||
|
|
||||||
def room_creation_join_creator(repo, %{
|
|
||||||
room: %Room{id: room_id},
|
|
||||||
create_room_event: %Event{sender: creator, event_id: create_room_id}
|
|
||||||
}) do
|
|
||||||
# TODO: state resolution
|
|
||||||
join(room_id, creator)
|
|
||||||
|> Map.put(:prev_events, [create_room_id])
|
|
||||||
|> Map.put(:auth_events, [create_room_id])
|
|
||||||
|> repo.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
def room_creation_power_levels(
|
|
||||||
repo,
|
|
||||||
%{
|
|
||||||
room: %Room{id: room_id},
|
|
||||||
create_room_event: %Event{sender: creator, event_id: create_room_id},
|
|
||||||
join_creator_event: %Event{event_id: join_creator_id}
|
|
||||||
}
|
|
||||||
) do
|
|
||||||
# TODO: state resolution
|
|
||||||
power_levels(room_id, creator)
|
|
||||||
|> Map.put(:prev_events, [join_creator_id])
|
|
||||||
|> Map.put(:auth_events, [create_room_id, join_creator_id])
|
|
||||||
|> repo.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
def room_creation_name(_repo, %{input: %CreateRoom{name: nil}}), do: {:ok, nil}
|
|
||||||
|
|
||||||
def room_creation_name(_repo, %{input: %CreateRoom{name: name}}) when byte_size(name) > 255,
|
|
||||||
do: {:error, :name}
|
|
||||||
|
|
||||||
def room_creation_name(
|
|
||||||
repo,
|
|
||||||
%{
|
|
||||||
input: %CreateRoom{name: name},
|
|
||||||
room: %Room{id: room_id},
|
|
||||||
create_room_event: %Event{sender: creator, event_id: create_room_id},
|
|
||||||
join_creator_event: %Event{event_id: join_creator_id},
|
|
||||||
power_levels_event: %Event{event_id: power_levels_id}
|
|
||||||
}
|
|
||||||
) do
|
|
||||||
# TODO: state resolution
|
|
||||||
room_name(room_id, creator, name)
|
|
||||||
|> Map.put(:prev_events, [power_levels_id])
|
|
||||||
|> Map.put(:auth_events, [create_room_id, join_creator_id, power_levels_id])
|
|
||||||
|> repo.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
def room_creation_topic(_repo, %{input: %CreateRoom{topic: nil}}), do: {:ok, nil}
|
|
||||||
|
|
||||||
def room_creation_topic(
|
|
||||||
repo,
|
|
||||||
%{
|
|
||||||
input: %CreateRoom{topic: topic},
|
|
||||||
room: %Room{id: room_id},
|
|
||||||
create_room_event: %Event{sender: creator, event_id: create_room_id},
|
|
||||||
join_creator_event: %Event{event_id: join_creator_id},
|
|
||||||
power_levels_event: %Event{event_id: power_levels_id},
|
|
||||||
name_event: name_event
|
|
||||||
}
|
|
||||||
) do
|
|
||||||
# TODO: state resolution
|
|
||||||
prev_event = if name_event, do: name_event.event_id, else: power_levels_id
|
|
||||||
|
|
||||||
room_topic(room_id, creator, topic)
|
|
||||||
|> Map.put(:prev_events, [prev_event])
|
|
||||||
|> Map.put(:auth_events, [create_room_id, join_creator_id, power_levels_id])
|
|
||||||
|> repo.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
def generate_event_id do
|
def generate_event_id do
|
||||||
"$" <> MatrixServer.random_string(17) <> ":" <> MatrixServer.server_name()
|
"$" <> MatrixServer.random_string(17) <> ":" <> MatrixServer.server_name()
|
||||||
end
|
end
|
||||||
|
|
||||||
def events_to_state_set(events) do
|
|
||||||
Enum.into(events, %{}, fn %Event{type: type, state_key: state_key} = event ->
|
|
||||||
{{type, state_key}, event}
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def resolve(state_sets) do
|
|
||||||
{unconflicted_state_map, conflicted_set} = calculate_conflict(state_sets)
|
|
||||||
# full_conflicted_set = MapSet.union(conflicted_set, auth_difference(state_sets))
|
|
||||||
|
|
||||||
# conflicted_control_events =
|
|
||||||
# Enum.filter(full_conflicted_set, &is_control_event/1) |> MapSet.new()
|
|
||||||
|
|
||||||
# conflicted_control_events_with_auth =
|
|
||||||
# MapSet.union(
|
|
||||||
# conflicted_control_events,
|
|
||||||
# MapSet.intersection(
|
|
||||||
# full_conflicted_set,
|
|
||||||
# full_auth_chain(MapSet.to_list(conflicted_control_events))
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
|
|
||||||
# sorted_control_events = Enum.sort(conflicted_control_events_with_auth, &rev_top_pow_order/2)
|
|
||||||
# partial_resolved_state = iterative_auth_checks(sorted_control_events, unconflicted_state_map)
|
|
||||||
|
|
||||||
# other_conflicted_events =
|
|
||||||
# MapSet.difference(full_conflicted_set, conflicted_control_events_with_auth)
|
|
||||||
|
|
||||||
# resolved_power_levels = partial_resolved_state[{:power_levels, ""}]
|
|
||||||
|
|
||||||
# sorted_other_events =
|
|
||||||
# Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels))
|
|
||||||
|
|
||||||
# nearly_final_state = iterative_auth_checks(sorted_other_events, partial_resolved_state)
|
|
||||||
|
|
||||||
# Map.merge(nearly_final_state, unconflicted_state_map)
|
|
||||||
end
|
|
||||||
|
|
||||||
def calculate_conflict(state_sets) do
|
|
||||||
{unconflicted, conflicted} =
|
|
||||||
state_sets
|
|
||||||
|> Enum.flat_map(&Map.keys/1)
|
|
||||||
|> MapSet.new()
|
|
||||||
|> Enum.map(fn state_pair ->
|
|
||||||
events =
|
|
||||||
Enum.map(state_sets, &Map.get(&1, state_pair))
|
|
||||||
|> MapSet.new()
|
|
||||||
|
|
||||||
{state_pair, events}
|
|
||||||
end)
|
|
||||||
|> Enum.split_with(fn {_k, events} ->
|
|
||||||
MapSet.size(events) == 1
|
|
||||||
end)
|
|
||||||
|
|
||||||
unconflicted_state_map =
|
|
||||||
Enum.into(unconflicted, %{}, fn {state_pair, events} ->
|
|
||||||
event = MapSet.to_list(events) |> hd()
|
|
||||||
|
|
||||||
{state_pair, event}
|
|
||||||
end)
|
|
||||||
|
|
||||||
conflicted_state_set =
|
|
||||||
Enum.reduce(conflicted, MapSet.new(), fn {_, events}, acc ->
|
|
||||||
MapSet.union(acc, events)
|
|
||||||
end)
|
|
||||||
|> MapSet.delete(nil)
|
|
||||||
|
|
||||||
{unconflicted_state_map, conflicted_state_set}
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,35 +3,18 @@ defmodule MatrixServer.RoomServer do
|
||||||
|
|
||||||
alias MatrixServer.{Repo, Room, Event, Account}
|
alias MatrixServer.{Repo, Room, Event, Account}
|
||||||
alias MatrixServerWeb.API.CreateRoom
|
alias MatrixServerWeb.API.CreateRoom
|
||||||
alias Ecto.Multi
|
|
||||||
|
|
||||||
@registry MatrixServer.RoomServer.Registry
|
@registry MatrixServer.RoomServer.Registry
|
||||||
@supervisor MatrixServer.RoomServer.Supervisor
|
@supervisor MatrixServer.RoomServer.Supervisor
|
||||||
|
|
||||||
def get_room_server(room_id) do
|
|
||||||
case Registry.lookup(@registry, room_id) do
|
|
||||||
[{pid, _}] ->
|
|
||||||
{:ok, pid}
|
|
||||||
|
|
||||||
[] ->
|
|
||||||
opts = [
|
|
||||||
room_id: room_id,
|
|
||||||
name: {:via, Registry, {@registry, room_id}}
|
|
||||||
]
|
|
||||||
|
|
||||||
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def start_link(opts) do
|
def start_link(opts) do
|
||||||
{name, opts} = Keyword.pop(opts, :name)
|
{name, opts} = Keyword.pop(opts, :name)
|
||||||
GenServer.start_link(__MODULE__, opts, name: name)
|
GenServer.start_link(__MODULE__, opts, name: name)
|
||||||
end
|
end
|
||||||
|
|
||||||
def create_room(input, account) do
|
def create_room(input, account) do
|
||||||
Multi.new()
|
%Room{id: room_id} = room = Repo.insert!(Room.create_changeset(input))
|
||||||
|> Multi.insert(:room, Room.create_changeset(input))
|
|
||||||
|> Multi.run(:room_server, fn _repo, %{room: %Room{id: room_id} = room} ->
|
|
||||||
opts = [
|
opts = [
|
||||||
name: {:via, Registry, {@registry, room_id}},
|
name: {:via, Registry, {@registry, room_id}},
|
||||||
input: input,
|
input: input,
|
||||||
|
@ -40,8 +23,6 @@ defmodule MatrixServer.RoomServer do
|
||||||
]
|
]
|
||||||
|
|
||||||
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
|
DynamicSupervisor.start_child(@supervisor, {__MODULE__, opts})
|
||||||
end)
|
|
||||||
|> Repo.transaction()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
|
@ -50,27 +31,23 @@ defmodule MatrixServer.RoomServer do
|
||||||
input = Keyword.fetch!(opts, :input)
|
input = Keyword.fetch!(opts, :input)
|
||||||
account = Keyword.fetch!(opts, :account)
|
account = Keyword.fetch!(opts, :account)
|
||||||
|
|
||||||
state = %{
|
|
||||||
room_id: room_id,
|
|
||||||
state_set: %{}
|
|
||||||
}
|
|
||||||
|
|
||||||
Repo.transaction(fn ->
|
Repo.transaction(fn ->
|
||||||
with {:ok, create_room_event} <- insert_create_room_event(account, input, state) do
|
with {:ok, state_set} <- insert_create_room_event(account, input, room_id) do
|
||||||
{:ok, state}
|
{:ok, %{room_id: room_id, state_set: state_set}}
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
{:ok, state}
|
|
||||||
end
|
end
|
||||||
|
|
||||||
defp insert_create_room_event(
|
defp insert_create_room_event(
|
||||||
%Account{localpart: localpart},
|
%Account{localpart: localpart},
|
||||||
%CreateRoom{room_version: room_version},
|
%CreateRoom{room_version: room_version},
|
||||||
%{room_id: room_id, state_set: state_set}
|
room_id
|
||||||
) do
|
) do
|
||||||
create_room_event = Event.create_room(room_id, MatrixServer.get_mxid(localpart), room_version)
|
state_set =
|
||||||
MatrixServer.StateResolution.resolve(create_room_event)
|
Event.create_room(room_id, MatrixServer.get_mxid(localpart), room_version)
|
||||||
{:ok, create_room_event}
|
|> Repo.insert!()
|
||||||
|
|> MatrixServer.StateResolution.resolve(true)
|
||||||
|
|
||||||
|
{:ok, state_set}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -12,7 +12,7 @@ defmodule MatrixServer.StateResolution do
|
||||||
|> Map.put(:auth_events, ["create", "join_charlie", "b"])
|
|> Map.put(:auth_events, ["create", "join_charlie", "b"])
|
||||||
end
|
end
|
||||||
|
|
||||||
def resolve(%Event{room_id: room_id} = event) do
|
def resolve(%Event{room_id: room_id} = event, apply_state \\ false) do
|
||||||
room_events =
|
room_events =
|
||||||
Event
|
Event
|
||||||
|> where([e], e.room_id == ^room_id)
|
|> where([e], e.room_id == ^room_id)
|
||||||
|
@ -20,68 +20,72 @@ defmodule MatrixServer.StateResolution do
|
||||||
|> Repo.all()
|
|> Repo.all()
|
||||||
|> Enum.into(%{})
|
|> Enum.into(%{})
|
||||||
|
|
||||||
resolve(event, room_events)
|
resolve(event, room_events, apply_state)
|
||||||
end
|
end
|
||||||
|
|
||||||
def resolve(
|
def resolve(
|
||||||
%Event{type: type, state_key: state_key, event_id: event_id, prev_events: prev_event_ids},
|
%Event{type: type, state_key: state_key, event_id: event_id, prev_events: prev_event_ids},
|
||||||
room_events
|
room_events,
|
||||||
|
apply_state
|
||||||
) do
|
) do
|
||||||
state_sets =
|
state_sets =
|
||||||
prev_event_ids
|
prev_event_ids
|
||||||
|> Enum.map(&room_events[&1])
|
|> Enum.map(&room_events[&1])
|
||||||
|> Enum.map(&resolve(&1, room_events))
|
|> Enum.map(&resolve(&1, room_events))
|
||||||
|
|
||||||
resolved_state = resolve(state_sets, room_events)
|
resolved_state = do_resolve(state_sets, room_events)
|
||||||
# TODO: check if state event
|
# TODO: check if state event
|
||||||
|
if apply_state do
|
||||||
Map.put(resolved_state, {type, state_key}, event_id)
|
Map.put(resolved_state, {type, state_key}, event_id)
|
||||||
|
else
|
||||||
|
resolved_state
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def resolve([], _), do: %{}
|
def do_resolve([], _), do: %{}
|
||||||
|
|
||||||
def resolve(state_sets, room_events) do
|
def do_resolve(state_sets, room_events) do
|
||||||
{unconflicted_state_map, conflicted_state_set} = calculate_conflict(state_sets)
|
{unconflicted_state_map, conflicted_state_set} = calculate_conflict(state_sets)
|
||||||
|
|
||||||
if MapSet.size(conflicted_state_set) == 0 do
|
if MapSet.size(conflicted_state_set) == 0 do
|
||||||
unconflicted_state_map
|
unconflicted_state_map
|
||||||
else
|
else
|
||||||
|
do_resolve(state_sets, room_events, unconflicted_state_map, conflicted_state_set)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def do_resolve(state_sets, room_events, unconflicted_state_map, conflicted_state_set) do
|
||||||
full_conflicted_set =
|
full_conflicted_set =
|
||||||
MapSet.union(conflicted_state_set, auth_difference(state_sets, room_events))
|
MapSet.union(conflicted_state_set, auth_difference(state_sets, room_events))
|
||||||
|
|
||||||
conflicted_control_event_ids =
|
conflicted_control_event_ids =
|
||||||
Enum.filter(full_conflicted_set, &is_control_event(&1, room_events)) |> MapSet.new()
|
full_conflicted_set
|
||||||
|
|> Enum.filter(&is_control_event(&1, room_events))
|
||||||
|
|> MapSet.new()
|
||||||
|
|
||||||
conflicted_control_event_ids_with_auth =
|
conflicted_control_events_with_auth_ids =
|
||||||
conflicted_control_event_ids
|
conflicted_control_event_ids
|
||||||
|> MapSet.to_list()
|
|> MapSet.to_list()
|
||||||
|> full_auth_chain(room_events)
|
|> full_auth_chain(room_events)
|
||||||
|> MapSet.intersection(full_conflicted_set)
|
|> MapSet.intersection(full_conflicted_set)
|
||||||
|> MapSet.union(conflicted_control_event_ids)
|
|> MapSet.union(conflicted_control_event_ids)
|
||||||
|
|
||||||
conflicted_control_events_with_auth =
|
|
||||||
Enum.map(conflicted_control_event_ids_with_auth, &room_events[&1])
|
|
||||||
|
|
||||||
sorted_control_events =
|
sorted_control_events =
|
||||||
Enum.sort(conflicted_control_events_with_auth, rev_top_pow_order(room_events))
|
conflicted_control_events_with_auth_ids
|
||||||
|
|> Enum.map(&room_events[&1])
|
||||||
|
|> Enum.sort(rev_top_pow_order(room_events))
|
||||||
|
|
||||||
partial_resolved_state =
|
partial_resolved_state =
|
||||||
iterative_auth_checks(sorted_control_events, unconflicted_state_map, room_events)
|
iterative_auth_checks(sorted_control_events, unconflicted_state_map, room_events)
|
||||||
|
|
||||||
other_conflicted_event_ids =
|
|
||||||
MapSet.difference(full_conflicted_set, conflicted_control_event_ids_with_auth)
|
|
||||||
|
|
||||||
other_conflicted_events = Enum.map(other_conflicted_event_ids, &room_events[&1])
|
|
||||||
|
|
||||||
resolved_power_levels = partial_resolved_state[{"m.room.power_levels", ""}]
|
resolved_power_levels = partial_resolved_state[{"m.room.power_levels", ""}]
|
||||||
|
|
||||||
sorted_other_events =
|
conflicted_control_events_with_auth_ids
|
||||||
Enum.sort(other_conflicted_events, mainline_order(resolved_power_levels, room_events))
|
|> MapSet.difference(full_conflicted_set)
|
||||||
|
|> Enum.map(&room_events[&1])
|
||||||
nearly_final_state =
|
|> Enum.sort(mainline_order(resolved_power_levels, room_events))
|
||||||
iterative_auth_checks(sorted_other_events, partial_resolved_state, room_events)
|
|> iterative_auth_checks(partial_resolved_state, room_events)
|
||||||
|
|> Map.merge(unconflicted_state_map)
|
||||||
Map.merge(nearly_final_state, unconflicted_state_map)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def calculate_conflict(state_sets) do
|
def calculate_conflict(state_sets) do
|
||||||
|
@ -256,11 +260,14 @@ defmodule MatrixServer.StateResolution do
|
||||||
|
|
||||||
def iterative_auth_checks(events, state_set, room_events) do
|
def iterative_auth_checks(events, state_set, room_events) do
|
||||||
Enum.reduce(events, state_set, fn event, acc ->
|
Enum.reduce(events, state_set, fn event, acc ->
|
||||||
if is_authorized2(event, acc, room_events), do: insert_event(event, acc), else: acc
|
if is_authorized2(event, acc, room_events), do: update_state_set(event, acc), else: acc
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def insert_event(%Event{type: event_type, state_key: state_key, event_id: event_id}, state_set) do
|
def update_state_set(
|
||||||
|
%Event{type: event_type, state_key: state_key, event_id: event_id},
|
||||||
|
state_set
|
||||||
|
) do
|
||||||
Map.put(state_set, {event_type, state_key}, event_id)
|
Map.put(state_set, {event_type, state_key}, event_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,15 @@
|
||||||
alias MatrixServer.{Repo, Room, Event}
|
alias MatrixServer.{Repo, Room, Event, Account, Device}
|
||||||
|
|
||||||
|
Repo.insert!(%Account{
|
||||||
|
localpart: "chuck",
|
||||||
|
password_hash: Bcrypt.hash_pwd_salt("sneed")
|
||||||
|
})
|
||||||
|
|
||||||
|
Repo.insert(%Device{
|
||||||
|
device_id: "android",
|
||||||
|
display_name: "My Android",
|
||||||
|
localpart: "chuck"
|
||||||
|
})
|
||||||
|
|
||||||
# Auth difference example from here:
|
# Auth difference example from here:
|
||||||
# https://matrix.org/docs/guides/implementing-stateres#auth-differences
|
# https://matrix.org/docs/guides/implementing-stateres#auth-differences
|
||||||
|
|
Loading…
Reference in a new issue