Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions ui/src/modules/common/Modals/IngestionModeChangeModal.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Button, Modal } from "antd"
import { useAppStore } from "../../../store"
import { IngestionModeChangeModalProps } from "../../../types/modalTypes"

const IngestionModeChangeModal = ({
onConfirm,
ingestionMode,
}: IngestionModeChangeModalProps) => {
const { showIngestionModeChangeModal, setShowIngestionModeChangeModal } =
useAppStore()

return (
<Modal
open={showIngestionModeChangeModal}
footer={null}
closable={false}
width={400}
centered
>
<div className="flex w-full flex-col justify-center">
<div className="text-xl font-medium text-black">
Switch to {ingestionMode} for all tables ?
</div>

<div className="mt-2 text-black">
<div className="text-sm">
All tables will be switched to {ingestionMode} mode,
</div>
<div className="text-sm">
You can change mode for individual tables
</div>
</div>

<div className="mt-7 flex w-full gap-4">
<Button
type="primary"
onClick={() => {
setShowIngestionModeChangeModal(false)
onConfirm(ingestionMode)
}}
>
Confirm
</Button>
<Button
type="default"
onClick={() => setShowIngestionModeChangeModal(false)}
>
Close
</Button>
</div>
</div>
</Modal>
)
}

export default IngestionModeChangeModal
64 changes: 63 additions & 1 deletion ui/src/modules/jobs/pages/SchemaConfiguration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { sourceService } from "../../../api"
import { useAppStore } from "../../../store"
import {
CombinedStreamsData,
IngestionMode,
SchemaConfigurationProps,
SelectedStream,
StreamData,
Expand Down Expand Up @@ -45,7 +46,8 @@ const SchemaConfiguration: React.FC<SchemaConfigurationProps> = ({
onLoadingChange,
}) => {
const prevSourceConfig = useRef(sourceConfig)
const { setShowDestinationDatabaseModal } = useAppStore()
const { setShowDestinationDatabaseModal, ingestionMode, setIngestionMode } =
useAppStore()
const [searchText, setSearchText] = useState("")
const [selectedFilters, setSelectedFilters] = useState<string[]>([
"All tables",
Expand Down Expand Up @@ -358,6 +360,7 @@ const SchemaConfiguration: React.FC<SchemaConfigurationProps> = ({
normalization: false,
filter: "",
disabled: false,
append_mode: ingestionMode === IngestionMode.APPEND,
},
]
changed = true
Expand Down Expand Up @@ -435,6 +438,63 @@ const SchemaConfiguration: React.FC<SchemaConfigurationProps> = ({
})
}

const handleIngestionModeChange = (
streamName: string,
namespace: string,
appendMode: boolean,
) => {
setApiResponse(prev => {
if (!prev) return prev

const streamExistsInSelected = prev.selected_streams[namespace]?.some(
s => s.stream_name === streamName,
)

if (!streamExistsInSelected) return prev

const updatedSelectedStreams = {
...prev.selected_streams,
[namespace]: prev.selected_streams[namespace].map(s =>
s.stream_name === streamName ? { ...s, append_mode: appendMode } : s,
),
}

const updated = {
...prev,
selected_streams: updatedSelectedStreams,
}

setSelectedStreams(updated)
return updated
})
}

const handleAllIngestionModeChange = (ingestionMode: IngestionMode) => {
const appendMode = ingestionMode === IngestionMode.APPEND
setIngestionMode(ingestionMode)
setApiResponse(prev => {
if (!prev) return prev

// Update all streams with the same append mode
const updateSelectedStreams = Object.fromEntries(
Object.entries(prev.selected_streams).map(([namespace, streams]) => [
namespace,
streams.map(stream => ({
...stream,
append_mode: appendMode,
})),
]),
)

const updated = {
...prev,
selected_streams: updateSelectedStreams,
}
setSelectedStreams(updated)
return updated
})
}

const filteredStreams = useMemo(() => {
if (!apiResponse?.streams) return []
let tempFilteredStreams = [...apiResponse.streams]
Expand Down Expand Up @@ -687,6 +747,7 @@ const SchemaConfiguration: React.FC<SchemaConfigurationProps> = ({
// Pass it to the parent component
setSelectedStreams(fullData as CombinedStreamsData)
}}
onIngestionModeChange={handleAllIngestionModeChange}
/>
) : loading ? (
<div className="flex h-[calc(100vh-250px)] items-center justify-center">
Expand Down Expand Up @@ -740,6 +801,7 @@ const SchemaConfiguration: React.FC<SchemaConfigurationProps> = ({
fromJobEditFlow={fromJobEditFlow}
initialSelectedStreams={apiResponse || undefined}
destinationType={destinationType}
onIngestionModeChange={handleIngestionModeChange}
/>
) : null}
</div>
Expand Down
66 changes: 63 additions & 3 deletions ui/src/modules/jobs/pages/streams/StreamConfiguration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import {
import {
ColumnsPlusRightIcon,
GridFourIcon,
InfoIcon,
LightningIcon,
PlusIcon,
SlidersHorizontalIcon,
XIcon,
ArrowSquareOutIcon,
InfoIcon,
} from "@phosphor-icons/react"

import {
Expand All @@ -30,6 +30,7 @@ import {
MultiFilterCondition,
CombinedStreamsData,
SyncMode,
IngestionMode,
} from "../../../../types"

import {
Expand Down Expand Up @@ -57,9 +58,11 @@ const StreamConfiguration = ({
fromJobEditFlow = false,
initialSelectedStreams,
destinationType = DESTINATION_INTERNAL_TYPES.S3,
onIngestionModeChange,
}: ExtendedStreamConfigurationProps) => {
const [activeTab, setActiveTab] = useState("config")
const [syncMode, setSyncMode] = useState(stream.stream.sync_mode)
const [appendMode, setAppendMode] = useState(false)
const [normalization, setNormalization] =
useState<boolean>(initialNormalization)
const [fullLoadFilter, setFullLoadFilter] = useState<boolean>(false)
Expand Down Expand Up @@ -141,7 +144,13 @@ const StreamConfiguration = ({
partition_regex: initialPartitionRegex || "",
fullLoadFilter: formData.fullLoadFilter || false,
}))
}, [stream, initialNormalization])

setAppendMode(
initialSelectedStreams?.selected_streams?.[
stream.stream.namespace || ""
]?.find(s => s.stream_name === stream.stream.name)?.append_mode || false,
)
}, [stream, initialNormalization, initialSelectedStreams])

useEffect(() => {
// Skip when change originated from local user action
Expand Down Expand Up @@ -249,6 +258,15 @@ const StreamConfiguration = ({
})
}

const handleIngestionModeChange = (ingestionMode: IngestionMode) => {
setAppendMode(ingestionMode === IngestionMode.APPEND)
onIngestionModeChange(
stream.stream.name,
stream.stream.namespace || "",
ingestionMode === IngestionMode.APPEND,
)
}

const handleNormalizationChange = (checked: boolean) => {
setNormalization(checked)
onNormalizationChange(
Expand Down Expand Up @@ -507,7 +525,13 @@ const StreamConfiguration = ({
? columnType.find(t => t !== "null") || columnType[0]
: columnType

if (primaryType === "string" || primaryType === "timestamp") {
if (
primaryType === "string" ||
primaryType === "timestamp" ||
primaryType === "timestamp_micro" ||
primaryType === "timestamp_nano" ||
primaryType === "timestamp_milli"
) {
// Check if value is already wrapped in quotes
if (!value.startsWith('"') && !value.endsWith('"')) {
return `"${value}"`
Expand Down Expand Up @@ -729,6 +753,42 @@ const StreamConfiguration = ({
</div>
)}
</div>

<div
className={clsx(
"mb-4",
isSelected
? "font-medium text-neutral-text"
: "font-normal text-gray-500",
)}
>
<div className="mb-3">
<label className="block w-full">Ingestion Mode:</label>
<div
className={clsx(
"text-xs",
!isSelected ? "text-gray-500" : "text-neutral-700",
)}
>
Specify how the data will be ingested in the destination
</div>
</div>
<Radio.Group
disabled={!isSelected}
className="mb-4 grid grid-cols-2 gap-4"
value={appendMode ? IngestionMode.APPEND : IngestionMode.UPSERT}
onChange={e => handleIngestionModeChange(e.target.value)}
>
<Radio value={IngestionMode.UPSERT}>Upsert</Radio>
<Radio value={IngestionMode.APPEND}>Append</Radio>
</Radio.Group>
{!isSelected && (
<div className="flex items-center gap-1 text-sm text-[#686868]">
<InfoIcon className="size-4" />
Select the stream to configure ingestion mode
</div>
)}
</div>
</div>

<div
Expand Down
2 changes: 1 addition & 1 deletion ui/src/modules/jobs/pages/streams/StreamHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const StreamHeader: React.FC<StreamHeaderProps> = ({
"flex w-full items-center justify-between border-b border-solid border-[#e5e7eb] py-3 pl-6",
isActiveStream
? "bg-primary-100"
: "bg-white hover:bg-background-primary",
: "border-l border-r bg-white hover:bg-background-primary",
)}
>
<div
Expand Down
Loading
Loading