Implement client send message endpoint

Fix state res and authorization to handle missing state_keys
This commit is contained in:
Pim Kunis 2021-08-31 16:32:04 +02:00
parent 5b3ec55f8b
commit c42fc7c350
8 changed files with 168 additions and 46 deletions

View file

@ -13,7 +13,17 @@ defmodule MatrixServer.RoomServer do
import Ecto.Query import Ecto.Query
import Ecto.Changeset import Ecto.Changeset
alias MatrixServer.{Repo, Room, Event, StateResolution, Account, JoinedRoom} alias MatrixServer.{
Repo,
Room,
Event,
StateResolution,
Account,
JoinedRoom,
Device,
DeviceTransaction
}
alias MatrixServer.StateResolution.Authorization alias MatrixServer.StateResolution.Authorization
alias MatrixServerWeb.Client.Request.{CreateRoom, Kick, Ban} alias MatrixServerWeb.Client.Request.{CreateRoom, Kick, Ban}
@ -154,6 +164,15 @@ defmodule MatrixServer.RoomServer do
GenServer.call(pid, {:set_visibility, account, visibility}) GenServer.call(pid, {:set_visibility, account, visibility})
end end
@doc """
Send a message to this room.
"""
@spec send_message(pid(), Account.t(), Device.t(), String.t(), map(), String.t()) ::
{:ok, String.t()} | {:error, atom()}
def send_message(pid, account, device, event_type, content, txn_id) do
GenServer.call(pid, {:send_message, account, device, event_type, content, txn_id})
end
### Implementation ### Implementation
@impl true @impl true
@ -248,8 +267,8 @@ defmodule MatrixServer.RoomServer do
def handle_call({:invite, account, user_id}, _from, %{room: room, state_set: state_set} = state) do def handle_call({:invite, account, user_id}, _from, %{room: room, state_set: state_set} = state) do
invite_event = Event.Invite.new(room, account, user_id) invite_event = Event.Invite.new(room, account, user_id)
case insert_single_event(room, state_set, invite_event) do case Repo.transaction(insert_single_event(room, state_set, invite_event)) do
{:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
{:error, reason} -> {:reply, {:error, reason}, state} {:error, reason} -> {:reply, {:error, reason}, state}
end end
end end
@ -261,8 +280,8 @@ defmodule MatrixServer.RoomServer do
) do ) do
join_event = Event.Join.new(room, account) join_event = Event.Join.new(room, account)
case insert_single_event(room, state_set, join_event) do case Repo.transaction(insert_single_event(room, state_set, join_event)) do
{:ok, {state_set, room}} -> {:ok, {state_set, room, _}} ->
{:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}} {:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}}
{:error, reason} -> {:error, reason} ->
@ -273,8 +292,8 @@ defmodule MatrixServer.RoomServer do
def handle_call({:leave, account}, _from, %{room: room, state_set: state_set} = state) do def handle_call({:leave, account}, _from, %{room: room, state_set: state_set} = state) do
leave_event = Event.Leave.new(room, account) leave_event = Event.Leave.new(room, account)
case insert_single_event(room, state_set, leave_event) do case Repo.transaction(insert_single_event(room, state_set, leave_event)) do
{:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
{:error, reason} -> {:reply, {:error, reason}, state} {:error, reason} -> {:reply, {:error, reason}, state}
end end
end end
@ -286,8 +305,8 @@ defmodule MatrixServer.RoomServer do
) do ) do
kick_event = Event.Kick.new(room, account, user_id, reason) kick_event = Event.Kick.new(room, account, user_id, reason)
case insert_single_event(room, state_set, kick_event) do case Repo.transaction(insert_single_event(room, state_set, kick_event)) do
{:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
{:error, reason} -> {:reply, {:error, reason}, state} {:error, reason} -> {:reply, {:error, reason}, state}
end end
end end
@ -299,8 +318,8 @@ defmodule MatrixServer.RoomServer do
) do ) do
ban_event = Event.Ban.new(room, account, user_id, reason) ban_event = Event.Ban.new(room, account, user_id, reason)
case insert_single_event(room, state_set, ban_event) do case Repo.transaction(insert_single_event(room, state_set, ban_event)) do
{:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
{:error, reason} -> {:reply, {:error, reason}, state} {:error, reason} -> {:reply, {:error, reason}, state}
end end
end end
@ -308,8 +327,8 @@ defmodule MatrixServer.RoomServer do
def handle_call({:unban, account, user_id}, _from, %{room: room, state_set: state_set} = state) do def handle_call({:unban, account, user_id}, _from, %{room: room, state_set: state_set} = state) do
unban_event = Event.Unban.new(room, account, user_id) unban_event = Event.Unban.new(room, account, user_id)
case insert_single_event(room, state_set, unban_event) do case Repo.transaction(insert_single_event(room, state_set, unban_event)) do
{:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}}
{:error, reason} -> {:reply, {:error, reason}, state} {:error, reason} -> {:reply, {:error, reason}, state}
end end
end end
@ -334,18 +353,64 @@ defmodule MatrixServer.RoomServer do
end end
end end
@spec insert_single_event(Room.t(), t(), Event.t()) :: {:ok, {t(), Room.t()}} | {:error, atom()} def handle_call(
{:send_message, account, device, event_type, content, txn_id},
_from,
%{room: room, state_set: state_set} = state
) do
message_event = Event.custom_message(room, account, event_type, content)
case Repo.transaction(insert_custom_message(state_set, room, device, message_event, txn_id)) do
{:ok, {state_set, room, event_id}} ->
{:reply, {:ok, event_id}, %{state | state_set: state_set, room: room}}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
defp insert_custom_message(
state_set,
room,
%Device{id: device_id} = device,
message_event,
txn_id
) do
fn ->
# Check if we already executed this transaction.
case Repo.one(
from dt in DeviceTransaction,
where: dt.txn_id == ^txn_id and dt.device_id == ^device_id
) do
%DeviceTransaction{event_id: event_id} ->
{state_set, room, event_id}
nil ->
with {state_set, room, %Event{event_id: event_id}} <-
insert_single_event(room, state_set, message_event).() do
# Mark this transaction as done.
Ecto.build_assoc(device, :device_transactions, txn_id: txn_id, event_id: event_id)
|> Repo.insert!()
{state_set, room, event_id}
end
end
end
end
@spec insert_single_event(Room.t(), t(), Event.t()) ::
(() -> {t(), Room.t(), Event.t()} | {:error, atom()})
defp insert_single_event(room, state_set, event) do defp insert_single_event(room, state_set, event) do
Repo.transaction(fn -> fn ->
case finalize_and_insert_event(event, state_set, room) do case finalize_and_insert_event(event, state_set, room) do
{:ok, state_set, room} -> {:ok, state_set, room, event} ->
_ = update_room_state_set(room, state_set) _ = update_room_state_set(room, state_set)
{state_set, room} {state_set, room, event}
{:error, reason} -> {:error, reason} ->
Repo.rollback(reason) Repo.rollback(reason)
end end
end) end
end end
# Get a function that inserts all events for room creation. # Get a function that inserts all events for room creation.
@ -374,7 +439,7 @@ defmodule MatrixServer.RoomServer do
result = result =
Enum.reduce_while(events, {%{}, room}, fn event, {state_set, room} -> Enum.reduce_while(events, {%{}, room}, fn event, {state_set, room} ->
case finalize_and_insert_event(event, state_set, room) do case finalize_and_insert_event(event, state_set, room) do
{:ok, state_set, room} -> {:cont, {state_set, room}} {:ok, state_set, room, _} -> {:cont, {state_set, room}}
{:error, reason} -> {:halt, {:error, reason}} {:error, reason} -> {:halt, {:error, reason}}
end end
end) end)
@ -441,7 +506,7 @@ defmodule MatrixServer.RoomServer do
# - Event ID # - Event ID
# - Signature # - Signature
@spec finalize_and_insert_event(Event.t(), t(), Room.t()) :: @spec finalize_and_insert_event(Event.t(), t(), Room.t()) ::
{:ok, t(), Room.t()} | {:error, atom()} {:ok, t(), Room.t(), Event.t()} | {:error, atom()}
defp finalize_and_insert_event( defp finalize_and_insert_event(
event, event,
state_set, state_set,
@ -512,7 +577,7 @@ defmodule MatrixServer.RoomServer do
# Implements the checks as described in the # Implements the checks as described in the
# [Matrix docs](https://matrix.org/docs/spec/server_server/latest#checks-performed-on-receipt-of-a-pdu). # [Matrix docs](https://matrix.org/docs/spec/server_server/latest#checks-performed-on-receipt-of-a-pdu).
@spec authenticate_and_insert_event(Event.t(), t(), Room.t()) :: @spec authenticate_and_insert_event(Event.t(), t(), Room.t()) ::
{:ok, t(), Room.t()} | {:error, atom()} {:ok, t(), Room.t(), Event.t()} | {:error, atom()}
defp authenticate_and_insert_event(event, current_state_set, room) do defp authenticate_and_insert_event(event, current_state_set, room) do
# TODO: Correctly handle soft fails. # TODO: Correctly handle soft fails.
# Check the following things: # Check the following things:
@ -533,7 +598,7 @@ defmodule MatrixServer.RoomServer do
# TODO: Do this as a background job, and not after every insert... # TODO: Do this as a background job, and not after every insert...
_ = update_joined_rooms(room, state_set) _ = update_joined_rooms(room, state_set)
{:ok, state_set, room} {:ok, state_set, room, event}
else else
_ -> {:error, :authorization} _ -> {:error, :authorization}
end end

View file

@ -3,7 +3,7 @@ defmodule MatrixServer.Device do
import Ecto.{Changeset, Query} import Ecto.{Changeset, Query}
alias MatrixServer.{Account, Device, Repo} alias MatrixServer.{Account, Device, Repo, DeviceTransaction}
alias MatrixServerWeb.Client.Request.Login alias MatrixServerWeb.Client.Request.Login
@type t :: %__MODULE__{ @type t :: %__MODULE__{
@ -19,6 +19,7 @@ defmodule MatrixServer.Device do
field :display_name, :string field :display_name, :string
belongs_to :account, Account belongs_to :account, Account
has_many :device_transactions, DeviceTransaction
end end
def changeset(device, params \\ %{}) do def changeset(device, params \\ %{}) do

View file

@ -0,0 +1,18 @@
defmodule MatrixServer.DeviceTransaction do
use Ecto.Schema
alias MatrixServer.Device
@type t :: %__MODULE__{
txn_id: String.t(),
event_id: String.t(),
device_id: integer()
}
@primary_key {:txn_id, :string, []}
schema "device_transactions" do
field :event_id, :string
belongs_to :device, Device
end
end

View file

@ -10,7 +10,7 @@ defmodule MatrixServer.Event do
@type t :: %__MODULE__{ @type t :: %__MODULE__{
type: String.t(), type: String.t(),
origin_server_ts: integer(), origin_server_ts: integer(),
state_key: String.t(), state_key: String.t() | nil,
sender: UserId.t(), sender: UserId.t(),
content: map(), content: map(),
prev_events: [String.t()] | nil, prev_events: [String.t()] | nil,
@ -73,6 +73,15 @@ defmodule MatrixServer.Event do
} }
end end
@spec custom_message(Room.t(), Account.t(), String.t(), map()) :: t()
def custom_message(room, sender, type, content) do
%Event{
Event.new(room, sender)
| type: type,
content: content
}
end
@spec is_control_event(t()) :: boolean() @spec is_control_event(t()) :: boolean()
def is_control_event(%Event{type: "m.room.power_levels", state_key: ""}), do: true def is_control_event(%Event{type: "m.room.power_levels", state_key: ""}), do: true
def is_control_event(%Event{type: "m.room.join_rules", state_key: ""}), do: true def is_control_event(%Event{type: "m.room.join_rules", state_key: ""}), do: true
@ -131,7 +140,7 @@ defmodule MatrixServer.Event do
length(prev_events) == length(prev_event_ids) and length(prev_events) == length(prev_event_ids) and
not MatrixServer.has_duplicates?(state_pairs) and not MatrixServer.has_duplicates?(state_pairs) and
valid_auth_events?(event, auth_events) and valid_auth_events?(event, auth_events) and
Enum.find_value(state_pairs, &(&1 == {"m.room.create", ""})) and Enum.find_value(state_pairs, false, &(&1 == {"m.room.create", ""})) and
do_prevalidate(event, auth_events, prev_events) do_prevalidate(event, auth_events, prev_events)
end end

View file

@ -1,6 +1,6 @@
defmodule MatrixServer.StateResolution.Authorization do defmodule MatrixServer.StateResolution.Authorization do
@moduledoc """ @moduledoc """
Implementation of Matrix event authorization rules for stat resolution. Implementation of Matrix event authorization rules for state resolution.
Note that some authorization rules are already checked in Note that some authorization rules are already checked in
`MatrixServer.Event.prevalidate/1` so they are skipped here. `MatrixServer.Event.prevalidate/1` so they are skipped here.
@ -156,9 +156,14 @@ defmodule MatrixServer.StateResolution.Authorization do
# Check rules: 8, 9 # Check rules: 8, 9
cond do cond do
not has_power_level?(to_string(sender), power_levels, {:event, event}) -> false not has_power_level?(to_string(sender), power_levels, {:event, event}) ->
String.starts_with?(state_key, "@") and state_key != sender -> false false
true -> __authorized?(event, state_set)
is_binary(state_key) and String.starts_with?(state_key, "@") and state_key != sender ->
false
true ->
__authorized?(event, state_set)
end end
end end

View file

@ -208,13 +208,28 @@ defmodule MatrixServerWeb.Client.RoomController do
def unban(conn, _), do: put_error(conn, :missing_param) def unban(conn, _), do: put_error(conn, :missing_param)
def send_message(%Conn{assigns: %{account: account}, body_params: body_params} = conn, %{ def send_message(
%Conn{assigns: %{account: account, device: device}, body_params: body_params} = conn,
%{
"room_id" => room_id, "room_id" => room_id,
"event_type" => event_type, "event_type" => event_type,
"txn_id" => txn_id "txn_id" => txn_id
}) do }
) do
case RoomServer.get_room_server(room_id) do
{:ok, pid} ->
case RoomServer.send_message(pid, account, device, event_type, body_params, txn_id) do
{:ok, event_id} ->
conn conn
|> send_resp(200, []) |> put_status(200)
|> halt() |> json(%{event_id: event_id})
{:error, _} ->
put_error(conn, :unknown)
end
{:error, :not_found} ->
put_error(conn, :not_found, "The given room was not found.")
end
end end
end end

View file

@ -63,7 +63,7 @@ defmodule MatrixServer.Repo.Migrations.CreateInitialTables do
create table(:devices) do create table(:devices) do
add :device_id, :string, null: false add :device_id, :string, null: false
add :access_token, :string add :access_token, :string, null: false
add :display_name, :string add :display_name, :string
add :account_id, references(:accounts, on_delete: :delete_all), null: false add :account_id, references(:accounts, on_delete: :delete_all), null: false
@ -72,5 +72,11 @@ defmodule MatrixServer.Repo.Migrations.CreateInitialTables do
create index(:devices, [:device_id, :account_id], unique: true) create index(:devices, [:device_id, :account_id], unique: true)
create index(:devices, [:account_id]) create index(:devices, [:account_id])
create index(:devices, [:access_token], unique: true) create index(:devices, [:access_token], unique: true)
create table(:device_transactions, primary_key: false) do
add :txn_id, :string, primary_key: true, null: false
add :device_id, references(:devices, on_delete: :delete_all), primary_key: true, null: false
add :event_id, :string, null: false
end
end end
end end

View file

@ -1,12 +1,15 @@
alias MatrixServer.{Repo, Account, Device} alias MatrixServer.{Repo, Account}
account =
Repo.insert!(%Account{ Repo.insert!(%Account{
localpart: "chuck", localpart: "chuck",
password_hash: Bcrypt.hash_pwd_salt("sneed") password_hash: Bcrypt.hash_pwd_salt("sneed")
}) })
Repo.insert(%Device{ account
|> Ecto.build_assoc(:devices,
device_id: "android", device_id: "android",
display_name: "My Android", display_name: "My Android",
localpart: "chuck" access_token: "sneed"
}) )
|> Repo.insert!()