From c42fc7c35057e6f04b2c43b2463fedae8f8ab6cb Mon Sep 17 00:00:00 2001 From: Pim Kunis Date: Tue, 31 Aug 2021 16:32:04 +0200 Subject: [PATCH] Implement client send message endpoint Fix state res and authorization to handle missing state_keys --- lib/matrix_server/room_server.ex | 109 ++++++++++++++---- lib/matrix_server/schema/device.ex | 3 +- .../schema/device_transaction.ex | 18 +++ lib/matrix_server/schema/event.ex | 13 ++- .../state_resolution/authorization.ex | 13 ++- .../client/controllers/room_controller.ex | 31 +++-- .../20210830160818_create_initial_tables.exs | 8 +- priv/repo/seeds.exs | 19 +-- 8 files changed, 168 insertions(+), 46 deletions(-) create mode 100644 lib/matrix_server/schema/device_transaction.ex diff --git a/lib/matrix_server/room_server.ex b/lib/matrix_server/room_server.ex index 1c9daca..54dd12f 100644 --- a/lib/matrix_server/room_server.ex +++ b/lib/matrix_server/room_server.ex @@ -13,7 +13,17 @@ defmodule MatrixServer.RoomServer do import Ecto.Query import Ecto.Changeset - alias MatrixServer.{Repo, Room, Event, StateResolution, Account, JoinedRoom} + alias MatrixServer.{ + Repo, + Room, + Event, + StateResolution, + Account, + JoinedRoom, + Device, + DeviceTransaction + } + alias MatrixServer.StateResolution.Authorization alias MatrixServerWeb.Client.Request.{CreateRoom, Kick, Ban} @@ -154,6 +164,15 @@ defmodule MatrixServer.RoomServer do GenServer.call(pid, {:set_visibility, account, visibility}) end + @doc """ + Send a message to this room. + """ + @spec send_message(pid(), Account.t(), Device.t(), String.t(), map(), String.t()) :: + {:ok, String.t()} | {:error, atom()} + def send_message(pid, account, device, event_type, content, txn_id) do + GenServer.call(pid, {:send_message, account, device, event_type, content, txn_id}) + end + ### Implementation @impl true @@ -248,8 +267,8 @@ defmodule MatrixServer.RoomServer do def handle_call({:invite, account, user_id}, _from, %{room: room, state_set: state_set} = state) do invite_event = Event.Invite.new(room, account, user_id) - case insert_single_event(room, state_set, invite_event) do - {:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(insert_single_event(room, state_set, invite_event)) do + {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -261,8 +280,8 @@ defmodule MatrixServer.RoomServer do ) do join_event = Event.Join.new(room, account) - case insert_single_event(room, state_set, join_event) do - {:ok, {state_set, room}} -> + case Repo.transaction(insert_single_event(room, state_set, join_event)) do + {:ok, {state_set, room, _}} -> {:reply, {:ok, room_id}, %{state | state_set: state_set, room: room}} {:error, reason} -> @@ -273,8 +292,8 @@ defmodule MatrixServer.RoomServer do def handle_call({:leave, account}, _from, %{room: room, state_set: state_set} = state) do leave_event = Event.Leave.new(room, account) - case insert_single_event(room, state_set, leave_event) do - {:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(insert_single_event(room, state_set, leave_event)) do + {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -286,8 +305,8 @@ defmodule MatrixServer.RoomServer do ) do kick_event = Event.Kick.new(room, account, user_id, reason) - case insert_single_event(room, state_set, kick_event) do - {:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(insert_single_event(room, state_set, kick_event)) do + {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -299,8 +318,8 @@ defmodule MatrixServer.RoomServer do ) do ban_event = Event.Ban.new(room, account, user_id, reason) - case insert_single_event(room, state_set, ban_event) do - {:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(insert_single_event(room, state_set, ban_event)) do + {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -308,8 +327,8 @@ defmodule MatrixServer.RoomServer do def handle_call({:unban, account, user_id}, _from, %{room: room, state_set: state_set} = state) do unban_event = Event.Unban.new(room, account, user_id) - case insert_single_event(room, state_set, unban_event) do - {:ok, {state_set, room}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} + case Repo.transaction(insert_single_event(room, state_set, unban_event)) do + {:ok, {state_set, room, _}} -> {:reply, :ok, %{state | state_set: state_set, room: room}} {:error, reason} -> {:reply, {:error, reason}, state} end end @@ -334,18 +353,64 @@ defmodule MatrixServer.RoomServer do end end - @spec insert_single_event(Room.t(), t(), Event.t()) :: {:ok, {t(), Room.t()}} | {:error, atom()} + def handle_call( + {:send_message, account, device, event_type, content, txn_id}, + _from, + %{room: room, state_set: state_set} = state + ) do + message_event = Event.custom_message(room, account, event_type, content) + + case Repo.transaction(insert_custom_message(state_set, room, device, message_event, txn_id)) do + {:ok, {state_set, room, event_id}} -> + {:reply, {:ok, event_id}, %{state | state_set: state_set, room: room}} + + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end + + defp insert_custom_message( + state_set, + room, + %Device{id: device_id} = device, + message_event, + txn_id + ) do + fn -> + # Check if we already executed this transaction. + case Repo.one( + from dt in DeviceTransaction, + where: dt.txn_id == ^txn_id and dt.device_id == ^device_id + ) do + %DeviceTransaction{event_id: event_id} -> + {state_set, room, event_id} + + nil -> + with {state_set, room, %Event{event_id: event_id}} <- + insert_single_event(room, state_set, message_event).() do + # Mark this transaction as done. + Ecto.build_assoc(device, :device_transactions, txn_id: txn_id, event_id: event_id) + |> Repo.insert!() + + {state_set, room, event_id} + end + end + end + end + + @spec insert_single_event(Room.t(), t(), Event.t()) :: + (() -> {t(), Room.t(), Event.t()} | {:error, atom()}) defp insert_single_event(room, state_set, event) do - Repo.transaction(fn -> + fn -> case finalize_and_insert_event(event, state_set, room) do - {:ok, state_set, room} -> + {:ok, state_set, room, event} -> _ = update_room_state_set(room, state_set) - {state_set, room} + {state_set, room, event} {:error, reason} -> Repo.rollback(reason) end - end) + end end # Get a function that inserts all events for room creation. @@ -374,7 +439,7 @@ defmodule MatrixServer.RoomServer do result = Enum.reduce_while(events, {%{}, room}, fn event, {state_set, room} -> case finalize_and_insert_event(event, state_set, room) do - {:ok, state_set, room} -> {:cont, {state_set, room}} + {:ok, state_set, room, _} -> {:cont, {state_set, room}} {:error, reason} -> {:halt, {:error, reason}} end end) @@ -441,7 +506,7 @@ defmodule MatrixServer.RoomServer do # - Event ID # - Signature @spec finalize_and_insert_event(Event.t(), t(), Room.t()) :: - {:ok, t(), Room.t()} | {:error, atom()} + {:ok, t(), Room.t(), Event.t()} | {:error, atom()} defp finalize_and_insert_event( event, state_set, @@ -512,7 +577,7 @@ defmodule MatrixServer.RoomServer do # Implements the checks as described in the # [Matrix docs](https://matrix.org/docs/spec/server_server/latest#checks-performed-on-receipt-of-a-pdu). @spec authenticate_and_insert_event(Event.t(), t(), Room.t()) :: - {:ok, t(), Room.t()} | {:error, atom()} + {:ok, t(), Room.t(), Event.t()} | {:error, atom()} defp authenticate_and_insert_event(event, current_state_set, room) do # TODO: Correctly handle soft fails. # Check the following things: @@ -533,7 +598,7 @@ defmodule MatrixServer.RoomServer do # TODO: Do this as a background job, and not after every insert... _ = update_joined_rooms(room, state_set) - {:ok, state_set, room} + {:ok, state_set, room, event} else _ -> {:error, :authorization} end diff --git a/lib/matrix_server/schema/device.ex b/lib/matrix_server/schema/device.ex index 3e1a6ba..0c8b80b 100644 --- a/lib/matrix_server/schema/device.ex +++ b/lib/matrix_server/schema/device.ex @@ -3,7 +3,7 @@ defmodule MatrixServer.Device do import Ecto.{Changeset, Query} - alias MatrixServer.{Account, Device, Repo} + alias MatrixServer.{Account, Device, Repo, DeviceTransaction} alias MatrixServerWeb.Client.Request.Login @type t :: %__MODULE__{ @@ -19,6 +19,7 @@ defmodule MatrixServer.Device do field :display_name, :string belongs_to :account, Account + has_many :device_transactions, DeviceTransaction end def changeset(device, params \\ %{}) do diff --git a/lib/matrix_server/schema/device_transaction.ex b/lib/matrix_server/schema/device_transaction.ex new file mode 100644 index 0000000..5d2d4a9 --- /dev/null +++ b/lib/matrix_server/schema/device_transaction.ex @@ -0,0 +1,18 @@ +defmodule MatrixServer.DeviceTransaction do + use Ecto.Schema + + alias MatrixServer.Device + + @type t :: %__MODULE__{ + txn_id: String.t(), + event_id: String.t(), + device_id: integer() + } + + @primary_key {:txn_id, :string, []} + schema "device_transactions" do + field :event_id, :string + + belongs_to :device, Device + end +end diff --git a/lib/matrix_server/schema/event.ex b/lib/matrix_server/schema/event.ex index 798e050..77b7000 100644 --- a/lib/matrix_server/schema/event.ex +++ b/lib/matrix_server/schema/event.ex @@ -10,7 +10,7 @@ defmodule MatrixServer.Event do @type t :: %__MODULE__{ type: String.t(), origin_server_ts: integer(), - state_key: String.t(), + state_key: String.t() | nil, sender: UserId.t(), content: map(), prev_events: [String.t()] | nil, @@ -73,6 +73,15 @@ defmodule MatrixServer.Event do } end + @spec custom_message(Room.t(), Account.t(), String.t(), map()) :: t() + def custom_message(room, sender, type, content) do + %Event{ + Event.new(room, sender) + | type: type, + content: content + } + end + @spec is_control_event(t()) :: boolean() def is_control_event(%Event{type: "m.room.power_levels", state_key: ""}), do: true def is_control_event(%Event{type: "m.room.join_rules", state_key: ""}), do: true @@ -131,7 +140,7 @@ defmodule MatrixServer.Event do length(prev_events) == length(prev_event_ids) and not MatrixServer.has_duplicates?(state_pairs) and valid_auth_events?(event, auth_events) and - Enum.find_value(state_pairs, &(&1 == {"m.room.create", ""})) and + Enum.find_value(state_pairs, false, &(&1 == {"m.room.create", ""})) and do_prevalidate(event, auth_events, prev_events) end diff --git a/lib/matrix_server/state_resolution/authorization.ex b/lib/matrix_server/state_resolution/authorization.ex index b8e83b5..6559ff2 100644 --- a/lib/matrix_server/state_resolution/authorization.ex +++ b/lib/matrix_server/state_resolution/authorization.ex @@ -1,6 +1,6 @@ defmodule MatrixServer.StateResolution.Authorization do @moduledoc """ - Implementation of Matrix event authorization rules for stat resolution. + Implementation of Matrix event authorization rules for state resolution. Note that some authorization rules are already checked in `MatrixServer.Event.prevalidate/1` so they are skipped here. @@ -156,9 +156,14 @@ defmodule MatrixServer.StateResolution.Authorization do # Check rules: 8, 9 cond do - not has_power_level?(to_string(sender), power_levels, {:event, event}) -> false - String.starts_with?(state_key, "@") and state_key != sender -> false - true -> __authorized?(event, state_set) + not has_power_level?(to_string(sender), power_levels, {:event, event}) -> + false + + is_binary(state_key) and String.starts_with?(state_key, "@") and state_key != sender -> + false + + true -> + __authorized?(event, state_set) end end diff --git a/lib/matrix_server_web/client/controllers/room_controller.ex b/lib/matrix_server_web/client/controllers/room_controller.ex index 38f5a35..e9cc79a 100644 --- a/lib/matrix_server_web/client/controllers/room_controller.ex +++ b/lib/matrix_server_web/client/controllers/room_controller.ex @@ -208,13 +208,28 @@ defmodule MatrixServerWeb.Client.RoomController do def unban(conn, _), do: put_error(conn, :missing_param) - def send_message(%Conn{assigns: %{account: account}, body_params: body_params} = conn, %{ - "room_id" => room_id, - "event_type" => event_type, - "txn_id" => txn_id - }) do - conn - |> send_resp(200, []) - |> halt() + def send_message( + %Conn{assigns: %{account: account, device: device}, body_params: body_params} = conn, + %{ + "room_id" => room_id, + "event_type" => event_type, + "txn_id" => txn_id + } + ) do + case RoomServer.get_room_server(room_id) do + {:ok, pid} -> + case RoomServer.send_message(pid, account, device, event_type, body_params, txn_id) do + {:ok, event_id} -> + conn + |> put_status(200) + |> json(%{event_id: event_id}) + + {:error, _} -> + put_error(conn, :unknown) + end + + {:error, :not_found} -> + put_error(conn, :not_found, "The given room was not found.") + end end end diff --git a/priv/repo/migrations/20210830160818_create_initial_tables.exs b/priv/repo/migrations/20210830160818_create_initial_tables.exs index b67cd55..9884e2c 100644 --- a/priv/repo/migrations/20210830160818_create_initial_tables.exs +++ b/priv/repo/migrations/20210830160818_create_initial_tables.exs @@ -63,7 +63,7 @@ defmodule MatrixServer.Repo.Migrations.CreateInitialTables do create table(:devices) do add :device_id, :string, null: false - add :access_token, :string + add :access_token, :string, null: false add :display_name, :string add :account_id, references(:accounts, on_delete: :delete_all), null: false @@ -72,5 +72,11 @@ defmodule MatrixServer.Repo.Migrations.CreateInitialTables do create index(:devices, [:device_id, :account_id], unique: true) create index(:devices, [:account_id]) create index(:devices, [:access_token], unique: true) + + create table(:device_transactions, primary_key: false) do + add :txn_id, :string, primary_key: true, null: false + add :device_id, references(:devices, on_delete: :delete_all), primary_key: true, null: false + add :event_id, :string, null: false + end end end diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index ef17929..527b63f 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -1,12 +1,15 @@ -alias MatrixServer.{Repo, Account, Device} +alias MatrixServer.{Repo, Account} -Repo.insert!(%Account{ - localpart: "chuck", - password_hash: Bcrypt.hash_pwd_salt("sneed") -}) +account = + Repo.insert!(%Account{ + localpart: "chuck", + password_hash: Bcrypt.hash_pwd_salt("sneed") + }) -Repo.insert(%Device{ +account +|> Ecto.build_assoc(:devices, device_id: "android", display_name: "My Android", - localpart: "chuck" -}) + access_token: "sneed" +) +|> Repo.insert!()