Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ config :logflare,
]
|> filter_nil_kv_pairs.()

config :logflare,
:bigquery_backend_adaptor,
[
managed_service_account_pool_size:
System.get_env("LOGFLARE_BIGQUERY_MANAGED_SA_POOL", "0")
|> String.to_integer()
]
|> filter_nil_kv_pairs.()

config :logflare,
Logflare.Alerting,
[
Expand Down
112 changes: 58 additions & 54 deletions lib/logflare/google/resource_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ defmodule Logflare.Google.CloudResourceManager do
@moduledoc false
require Logger

import Ecto.Query

alias GoogleApi.CloudResourceManager.V1.Api
alias GoogleApi.CloudResourceManager.V1.Model
alias Logflare.Repo
alias Logflare.Google.BigQuery.GenUtils
alias Logflare.User
alias Logflare.Users
alias Logflare.TeamUsers
alias Logflare.Billing
alias Logflare.Utils.Tasks
alias Logflare.Backends.Adaptor.BigQueryAdaptor

Expand Down Expand Up @@ -68,20 +64,48 @@ defmodule Logflare.Google.CloudResourceManager do
}
)

{:error, response} ->
Logger.error("Set IAM policy error: #{GenUtils.get_tesla_error_message(response)}",
logflare: %{
google: %{
cloudresourcemanager: %{
"#{caller}": %{
accounts: Enum.count(members),
response: :error,
response_message: "#{GenUtils.get_tesla_error_message(response)}"
}
}
}
}
)
{:error, %Tesla.Env{} = response} ->
message = GenUtils.get_tesla_error_message(response)
user_exists_regexp = ~r/User (\S+?@\S+) does not exist/

cond do
message =~ user_exists_regexp ->
[captured] = Regex.run(user_exists_regexp, message, capture: :all_but_first)
# set user as invalid google account
result =
cond do
user = Users.get_by(email: captured) ->
user
|> Users.update_user_all_fields(%{valid_google_account: false})

team_user = TeamUsers.get_team_user_by(email: captured) ->
team_user
|> TeamUsers.update_team_user(%{valid_google_account: false})

true ->
:noop
end

if result == :noop do
Logger.error(
"Could find user #{captured} in the database. Set IAM policy error: #{message}"
)
else
Logger.info(
"Google account #{captured} was marked as invalid and excluded from IAM policy"
)
end

true ->
Logger.error("Set IAM policy error: #{message}",
error_string: Jason.decode!(response.body)
)

:noop
end

{:error, err} ->
Logger.error("Set IAM policy unknown error: #{inspect(err)}")
end
end

Expand Down Expand Up @@ -138,46 +162,26 @@ defmodule Logflare.Google.CloudResourceManager do
end

defp build_members() do
query =
from(u in User,
join: t in assoc(u, :team),
preload: [team: t],
select: u
)

all_paid_users =
query
|> Repo.all()
|> Enum.filter(fn user ->
case Billing.get_plan_by_user(user) do
%Billing.Plan{name: "Free"} -> false
_plan -> true
end
emails =
Users.list_users(paying: true, provider: :google)
|> Users.preload_valid_google_team_users()
|> Enum.flat_map(fn user ->
for tu <- user.team.team_users do
tu.email
end ++ [user.email]
end)

valid_paid_users =
all_paid_users
|> Enum.filter(&is_valid_member?/1)
|> List.flatten()

paid_users_team_members =
all_paid_users
|> Enum.map(fn paid_user ->
team_users = TeamUsers.list_team_users_by(team_id: paid_user.team.id)
Enum.filter(team_users, &is_valid_member?/1)
end)
|> List.flatten()
if length(emails) > 1000 do
Logger.warning(
"Number of user emails attached to IAM policy is greater than 1000 (current: #{length(emails)}), taking first 1400"
)
end

(valid_paid_users ++ paid_users_team_members)
|> Enum.sort_by(& &1.updated_at, {:desc, Date})
|> Enum.take(1450)
|> Enum.map(&("user:" <> &1.email))
emails
|> Enum.take(1400)
|> Enum.map(&("user:" <> &1))
end

defp is_valid_member?(%{provider: "google", valid_google_account: true}), do: true
defp is_valid_member?(%{provider: "google", valid_google_account: nil}), do: true
defp is_valid_member?(_), do: false

defp env_project_number, do: Application.get_env(:logflare, Logflare.Google)[:project_number]
defp env_service_account, do: Application.get_env(:logflare, Logflare.Google)[:service_account]
defp env_api_sa, do: Application.get_env(:logflare, Logflare.Google)[:api_sa]
Expand Down
24 changes: 24 additions & 0 deletions lib/logflare/users/users.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ defmodule Logflare.Users do
where(acc, [u], fragment("? -> ?", u.metadata, ^normalized_k) == ^v)
end)

{:provider, :google}, q ->
where(q, [u], u.provider == "google" and u.valid_google_account != false)

{:paying, true}, q ->
join(q, :left, [u], ba in assoc(u, :billing_account))
|> where(
[u, ..., ba],
(not is_nil(ba.stripe_subscriptions) and
fragment("jsonb_array_length(? -> 'data')", ba.stripe_subscriptions) > 0) or
(is_nil(ba) and u.billing_enabled) == false or
ba.lifetime_plan == true
)

_, q ->
q
end)
Expand Down Expand Up @@ -117,6 +130,17 @@ defmodule Logflare.Users do
Repo.preload(user, :team)
end

def preload_team_users(user) do
Repo.preload(user, team: [:team_users])
end

def preload_valid_google_team_users(user) do
query =
from(tu in TeamUser, where: tu.valid_google_account != false and tu.provider == "google")

Repo.preload(user, team: [team_users: query])
end

def preload_billing_account(user) do
Repo.preload(user, :billing_account)
end
Expand Down
82 changes: 77 additions & 5 deletions test/logflare/google/resource_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@ defmodule Logflare.Google.CloudResourceManagerTest do
end

describe "set_iam_policy/0" do
setup do
# Mock IAM API calls for listing existing service accounts
stub(GoogleApi.IAM.V1.Api.Projects, :iam_projects_service_accounts_list, fn
_conn, "projects/" <> _project_id, _opts ->
{:ok, %{accounts: [], nextPageToken: nil}}
end)

:ok
end

test "request body sends expected roles for paying users, paying team users and service accounts",
%{
google_configs: google_configs,
expected_members: expected_members
} do
pid = self()
# Mock IAM API calls for listing existing service accounts
stub(GoogleApi.IAM.V1.Api.Projects, :iam_projects_service_accounts_list, fn
_conn, "projects/" <> _project_id, _opts ->
{:ok, %{accounts: [], nextPageToken: nil}}
end)

stub(
GoogleApi.CloudResourceManager.V1.Api.Projects,
Expand All @@ -51,6 +56,73 @@ defmodule Logflare.Google.CloudResourceManagerTest do
assert Enum.member?(members_list, member)
end
end

test "sets user as invalid google account if user does not exist" do
user = insert(:user, valid_google_account: true)

expect(
GoogleApi.CloudResourceManager.V1.Api.Projects,
:cloudresourcemanager_projects_set_iam_policy,
fn _, _project_number, [body: _body] ->
{:error,
%Tesla.Env{
status: 404,
body: Jason.encode!(%{error: %{message: "User #{user.email} does not exist."}})
}}
end
)

assert ExUnit.CaptureLog.capture_log(fn ->
CloudResourceManager.set_iam_policy(async: false)
end) =~ "marked as invalid"

user = Logflare.Repo.get!(Logflare.User, user.id)
refute user.valid_google_account

expect(
GoogleApi.CloudResourceManager.V1.Api.Projects,
:cloudresourcemanager_projects_set_iam_policy,
fn _, _project_number, [body: _body] ->
{:ok,
%Tesla.Env{
status: 200,
body: ""
}}
end
)

refute ExUnit.CaptureLog.capture_log(fn ->
CloudResourceManager.set_iam_policy(async: false)
end) =~ "marked as invalid"
end

test "sets team_user as invalid google account if user does not exist" do
team_user = insert(:team_user, valid_google_account: true)

expect(
GoogleApi.CloudResourceManager.V1.Api.Projects,
:cloudresourcemanager_projects_set_iam_policy,
fn _, _project_number, [body: _body] ->
{:error,
%Tesla.Env{
status: 404,
body: Jason.encode!(%{error: %{message: "User #{team_user.email} does not exist."}})
}}
end
)

assert ExUnit.CaptureLog.capture_log(fn ->
CloudResourceManager.set_iam_policy(async: false)
end) =~ "marked as invalid"

team_user = Logflare.Repo.get!(Logflare.TeamUsers.TeamUser, team_user.id)
refute team_user.valid_google_account

# attempting to re-set the policy should not raise an error
refute ExUnit.CaptureLog.capture_log(fn ->
CloudResourceManager.set_iam_policy(async: false)
end) =~ "marked as invalid"
end
end

defp setup_test_state() do
Expand Down
6 changes: 5 additions & 1 deletion test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ defmodule Logflare.Factory do
end

def team_user_factory do
email = "#{TestUtils.random_string(8)}@#{TestUtils.random_string()}.com"

%TeamUser{
name: "some name #{TestUtils.random_string()}",
team: build(:team),
provider: "google"
provider: "google",
email: email,
provider_uid: "provider_uid_#{TestUtils.random_string()}"
}
end

Expand Down
15 changes: 15 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ Mimic.copy(Logflare.Sources.Cache)
Mimic.copy(Logflare.SystemMetrics.AllLogsLogged)
Mimic.copy(Logflare.Users)
Mimic.copy(LogflareWeb.Plugs.RateLimiter)
Mimic.copy(Logflare.Alerting.AlertsScheduler)
Mimic.copy(Stripe.Customer)
Mimic.copy(Stripe.PaymentMethod)
Mimic.copy(Stripe.SubscriptionItem.Usage)
Mimic.copy(GoogleApi.BigQuery.V2.Api.Jobs)
Mimic.copy(GoogleApi.BigQuery.V2.Api.Tabledata)
Mimic.copy(GoogleApi.BigQuery.V2.Api.Tables)
Mimic.copy(GoogleApi.BigQuery.V2.Api.Datasets)
Mimic.copy(GoogleApi.CloudResourceManager.V1.Api.Projects)
Mimic.copy(GoogleApi.IAM.V1.Api.Projects)
Mimic.copy(Goth)
Mimic.copy(ConfigCat)
Mimic.copy(Finch)
Mimic.copy(ExTwilio.Message)
Mimic.copy(Broadway)

{:ok, _} = Application.ensure_all_started(:ex_machina)
{:ok, _} = Application.ensure_all_started(:mimic)
Expand Down
Loading