Skip to content

Commit

Permalink
Remove extra stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 committed Aug 7, 2024
1 parent 0392d5e commit 59e78c2
Show file tree
Hide file tree
Showing 9 changed files with 684 additions and 6 deletions.
15 changes: 15 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
topicName *String
cognitoGroup **String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down Expand Up @@ -143,6 +153,11 @@ synonyms
synonymId *String
name synonymsByUuid

kafka_acls
cognitoGroup *String
permissionType **String
name aclsByGroup

@aws
runtime nodejs20.x
region us-east-1
Expand Down
281 changes: 278 additions & 3 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDie } from './env.server'
import { domain, getEnvOrDieInProduction } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDie('KAFKA_CLIENT_ID')
const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET')
const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? ''
const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET')
const kafka = new Kafka({
client_id,
client_secret,
Expand Down Expand Up @@ -68,3 +79,267 @@ if (process.env.ARC_SANDBOX) {
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = {
topicName: string
permissionType: PermissionType
cognitoGroup: string
prefixed: boolean
}

export type PermissionType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE]
const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? ''
const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET')
const adminKafka = new Kafka({
client_id: admin_client_id,
client_secret: admin_client_secret,
domain,
})

function validateUser(user: User) {
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
}

export async function createKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
// Save to db
const db = await tables()
await db.kafka_acls.put(acl)

// Add to Kafka
const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createTopics({
topics: [
{
topic: acl.topicName,
},
],
})
const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLByTopicName(user: User, topicName: string) {
validateUser(user)
const db = await tables()
return (await db.kafka_acls.get({ topicName })) as KafkaACL
}

export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan(
{ client },
{
TableName,
FilterExpression: filter
? 'contains(topicName, :filter) OR contains(cognitoGroup, :filter)'
: undefined,
ExpressionAttributeValues: filter
? {
':filter': filter,
}
: undefined,
}
)

const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]
if (newACL) acls.push(...newACL)
}
return acls
}

export async function getKafkaTopicsForUser(user: User) {
validateUser(user)
const userGroups = user.groups.filter((x) =>
x.startsWith('gcn.nasa.gov/kafka-')
)
const db = await tables()
const items = (
await Promise.all([
...userGroups.map((cognitoGroup) =>
db.kafka_acls.query({
IndexName: 'aclsByGroup',
KeyConditionExpression:
'cognitoGroup = :group AND permissionType = :permission',
ProjectionExpression: 'topicName',
ExpressionAttributeValues: {
':group': cognitoGroup,
':permission': 'consumer',
},
})
),
])
)
.filter((x) => x.Count && x.Count > 0)
.flatMap((x) => x.Items)
.map((x) => x.topicName)

return items
}
export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()
const results: KafkaACL[] = []
for (const item of acls.resources) {
const topicName = item.resourceName
const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED
const producerRules = producerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const producerGroup =
producerRules &&
[
...new Set(
item.acls
.filter((acl) => producerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
const consumerRules = consumerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const consumerGroup =
consumerRules &&
[
...new Set(
item.acls
.filter((acl) => consumerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
if (producerRules && producerGroup)
results.push({
topicName,
permissionType: 'producer',
cognitoGroup: producerGroup,
prefixed,
})
if (consumerRules && consumerGroup)
results.push({
topicName,
permissionType: 'consumer',
cognitoGroup: consumerGroup,
prefixed,
})
}
return results
}

export async function deleteKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
const db = await tables()
await db.kafka_acls.delete({
topicName: acl.topicName,
cognitoGroup: acl.cognitoGroup,
})

const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.deleteAcls({ filters: acls })
await adminClient.disconnect()
}

function createProducerAcls(acl: KafkaACL): AclEntry[] {
// Create, Write, and Describe operations
return mapAclAndOperations(acl, producerOperations)
}

function createConsumerAcls(acl: KafkaACL): AclEntry[] {
// Read and Describe operations
return mapAclAndOperations(acl, consumerOperations)
}

function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) {
return operations.map((operation) => {
return {
resourceType: AclResourceTypes.TOPIC,
resourceName: acl.topicName,
resourcePatternType: acl.prefixed
? ResourcePatternTypes.PREFIXED
: ResourcePatternTypes.LITERAL,
principal: `User:${acl.cognitoGroup}`,
host: '*',
operation,
permissionType: AclPermissionTypes.ALLOW,
}
})
}

export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const mappedAcls = dbDefinedAcls.flatMap((x) =>
x.permissionType === 'producer'
? createProducerAcls(x)
: createConsumerAcls(x)
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createAcls({ acl: mappedAcls })
await adminClient.disconnect()
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay'
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -119,6 +120,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup)
const userIsAdmin = user?.groups.includes(adminGroup)

return {
origin,
Expand All @@ -129,6 +131,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -168,6 +171,11 @@ export function useSubmitterStatus() {
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react'
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@ export function Header() {
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@ export function Header() {
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin/kafka">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
Loading

0 comments on commit 59e78c2

Please sign in to comment.