Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/builder-contribute/source-shortcut
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev authored Sep 8, 2024
2 parents 428b44a + 4420264 commit 06479a1
Show file tree
Hide file tree
Showing 551 changed files with 16,317 additions and 13,546 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.64.2
current_version = 0.64.3
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
54 changes: 0 additions & 54 deletions .github/workflows/community_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,60 +76,6 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
subcommand: "format check all"
is_fork: "true"

check-review-requirements:
name: Check if a review is required from Connector teams on fork
if: github.event.pull_request.head.repo.fork == true
environment: community-ci-auto
runs-on: community-tooling-test-small
needs: fail_on_protected_path_changes
timeout-minutes: 10
env:
MAIN_BRANCH_NAME: "master"
permissions:
pull-requests: write
steps:
# This checkouts a fork which can contain untrusted code
# It's deemed safe as the review required check is not executing any checked out code
- name: Checkout fork
uses: actions/checkout@v4
with:
repository: ${{ github.event.pull_request.head.repo.full_name }}
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 1
# This will sync the .github folder of the main repo with the fork
# This allows us to use up to date actions and CI logic from the main repo
- name: Pull .github folder from main repository
id: pull_github_folder
run: |
git remote add main https://github.com/airbytehq/airbyte.git
git fetch main ${MAIN_BRANCH_NAME}
git checkout main/${MAIN_BRANCH_NAME} -- .github
git checkout main/${MAIN_BRANCH_NAME} -- airbyte-ci
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install ci-connector-ops package
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/connector_ops
- name: Write review requirements file
id: write-review-requirements-file
run: write-review-requirements-file >> $GITHUB_OUTPUT
- name: Get mandatory reviewers
id: get-mandatory-reviewers
run: print-mandatory-reviewers >> $GITHUB_OUTPUT
- name: Check if the review requirements are met
if: steps.write-review-requirements-file.outputs.CREATED_REQUIREMENTS_FILE == 'true'
uses: Automattic/action-required-review@v3
with:
status: ${{ steps.get-mandatory-reviewers.outputs.MANDATORY_REVIEWERS }}
token: ${{ secrets.OCTAVIA_4_ROOT_ACCESS }}
request-reviews: true
requirements-file: .github/connector_org_review_requirements.yaml

connectors_early_ci:
name: Run connectors early CI on fork
if: github.event.pull_request.head.repo.fork == true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package io.airbyte.cdk.state

import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Manages memory usage for the destination.
Expand All @@ -17,31 +19,49 @@ import kotlin.concurrent.withLock
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
*/
@Singleton
class MemoryManager {
private val availableMemoryBytes: Long = Runtime.getRuntime().maxMemory()
class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes
private var usedMemoryBytes = AtomicLong(0L)
private val memoryLock = ReentrantLock()
private val memoryLockCondition = memoryLock.newCondition()
private val mutex = Mutex()
private val syncChannel = Channel<Unit>(Channel.UNLIMITED)

val remainingMemoryBytes: Long
get() = totalMemoryBytes - usedMemoryBytes.get()

/* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */
suspend fun reserveBlocking(memoryBytes: Long) {
memoryLock.withLock {
while (usedMemoryBytes.get() + memoryBytes > availableMemoryBytes) {
memoryLockCondition.await()
if (memoryBytes > totalMemoryBytes) {
throw IllegalArgumentException(
"Requested ${memoryBytes}b memory exceeds ${totalMemoryBytes}b total"
)
}

mutex.withLock {
while (usedMemoryBytes.get() + memoryBytes > totalMemoryBytes) {
syncChannel.receive()
}
usedMemoryBytes.addAndGet(memoryBytes)
}
}

suspend fun reserveRatio(ratio: Double): Long {
val estimatedSize = (availableMemoryBytes.toDouble() * ratio).toLong()
val estimatedSize = (totalMemoryBytes.toDouble() * ratio).toLong()
reserveBlocking(estimatedSize)
return estimatedSize
}

fun release(memoryBytes: Long) {
memoryLock.withLock {
usedMemoryBytes.addAndGet(-memoryBytes)
memoryLockCondition.signalAll()
}
suspend fun release(memoryBytes: Long) {
usedMemoryBytes.addAndGet(-memoryBytes)
syncChannel.send(Unit)
}
}

interface AvailableMemoryProvider {
val availableMemoryBytes: Long
}

@Singleton
@Secondary
class JavaRuntimeAvailableMemoryProvider : AvailableMemoryProvider {
override val availableMemoryBytes: Long = Runtime.getRuntime().maxMemory()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.state

import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest
class MemoryManagerTest {
@Singleton
@Replaces(MemoryManager::class)
@Requires(env = ["test"])
class MockAvailableMemoryProvider : AvailableMemoryProvider {
override val availableMemoryBytes: Long = 1000
}

@Test
fun testReserveBlocking() = runTest {
val memoryManager = MemoryManager(MockAvailableMemoryProvider())
val reserved = AtomicBoolean(false)

try {
withTimeout(5000) { memoryManager.reserveBlocking(900) }
} catch (e: Exception) {
Assertions.fail<Unit>("Failed to reserve memory")
}

Assertions.assertEquals(100, memoryManager.remainingMemoryBytes)

val job = launch {
memoryManager.reserveBlocking(200)
reserved.set(true)
}

memoryManager.reserveBlocking(0)
Assertions.assertFalse(reserved.get())

memoryManager.release(50)
memoryManager.reserveBlocking(0)
Assertions.assertEquals(150, memoryManager.remainingMemoryBytes)
Assertions.assertFalse(reserved.get())

memoryManager.release(25)
memoryManager.reserveBlocking(0)
Assertions.assertEquals(175, memoryManager.remainingMemoryBytes)
Assertions.assertFalse(reserved.get())

memoryManager.release(25)
try {
withTimeout(5000) { job.join() }
} catch (e: Exception) {
Assertions.fail<Unit>("Failed to unblock reserving memory")
}
Assertions.assertEquals(0, memoryManager.remainingMemoryBytes)
Assertions.assertTrue(reserved.get())
}

@Test
fun testReserveBlockingMultithreaded() = runTest {
val memoryManager = MemoryManager(MockAvailableMemoryProvider())
withContext(Dispatchers.IO) {
memoryManager.reserveBlocking(1000)
Assertions.assertEquals(0, memoryManager.remainingMemoryBytes)
val nIterations = 100000

val jobs = (0 until nIterations).map { launch { memoryManager.reserveBlocking(10) } }

repeat(nIterations) {
memoryManager.release(10)
Assertions.assertTrue(
memoryManager.remainingMemoryBytes >= 0,
"Remaining memory is negative: ${memoryManager.remainingMemoryBytes}"
)
}
jobs.forEach { it.join() }
Assertions.assertEquals(0, memoryManager.remainingMemoryBytes)
}
}

@Test
fun testRequestingMoreThanAvailableThrows() = runTest {
val memoryManager = MemoryManager(MockAvailableMemoryProvider())
try {
memoryManager.reserveBlocking(1001)
} catch (e: IllegalArgumentException) {
return@runTest
}
Assertions.fail<Unit>("Requesting more memory than available should throw an exception")
}
}
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :--------- | :--------- | :----------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead |
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.20
version=0.44.21
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ open class S3StorageOperations(
private val s3FilenameTemplateManager: S3FilenameTemplateManager = S3FilenameTemplateManager()

private val partCounts: ConcurrentMap<String, AtomicInteger> = ConcurrentHashMap()
private val objectNameByPrefix: ConcurrentMap<String, Set<String>> = ConcurrentHashMap()

override fun getBucketObjectPath(
namespace: String?,
Expand Down Expand Up @@ -167,6 +168,32 @@ open class S3StorageOperations(
* @return the uploaded filename, which is different from the serialized buffer filename
* </extension></partId>
*/
@VisibleForTesting
fun getFileName(
objectPath: String,
recordsData: SerializableBuffer,
): String {
var fullObjectKey: String
do {
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
fullObjectKey =
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
.recordsData(recordsData)
.objectPath(objectPath)
.fileExtension(fileExtension)
.fileNamePattern(s3Config.fileNamePattern)
.build(),
)
} else {
objectPath + partId + fileExtension
}
} while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey))
return fullObjectKey
}
@Throws(IOException::class)
private fun loadDataIntoBucket(
objectPath: String,
Expand All @@ -175,22 +202,7 @@ open class S3StorageOperations(
): String {
val partSize: Long = DEFAULT_PART_SIZE.toLong()
val bucket: String? = s3Config.bucketName
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
val fullObjectKey: String =
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
.recordsData(recordsData)
.objectPath(objectPath)
.fileExtension(fileExtension)
.fileNamePattern(s3Config.fileNamePattern)
.build(),
)
} else {
objectPath + partId + fileExtension
}
val fullObjectKey: String = getFileName(objectPath, recordsData)
val metadata: MutableMap<String, String> = HashMap()
for (blobDecorator: BlobDecorator in blobDecorators) {
blobDecorator.updateMetadata(metadata, getMetadataMapping())
Expand Down Expand Up @@ -263,31 +275,14 @@ open class S3StorageOperations(
) {
AtomicInteger(0)
}

if (partCount.get() == 0) {
var objects: ObjectListing?
var objectCount = 0

val bucket: String? = s3Config.bucketName
objects = s3Client.listObjects(bucket, objectPath)

if (objects != null) {
objectCount += objects.objectSummaries.size
while (objects != null && objects.nextMarker != null) {
objects =
s3Client.listObjects(
ListObjectsRequest()
.withBucketName(bucket)
.withPrefix(objectPath)
.withMarker(objects.nextMarker),
)
if (objects != null) {
objectCount += objects.objectSummaries.size
}
}
objectNameByPrefix.computeIfAbsent(
objectPath,
) {
var objectList: Set<String> = setOf()
forObjectsByPage(objectPath) { objectSummaries ->
objectList = objectList + objectSummaries.map { it.key }
}

partCount.set(objectCount)
objectList
}

return partCount.getAndIncrement().toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityM
import io.airbyte.commons.jackson.MoreMappers

class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): JsonNode? {
private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): ObjectNode {
val newObj = MoreMappers.initMapper().createObjectNode()

val propertyName = JsonSchemaParquetPreprocessor.typeFieldName(matchingOption)
Expand All @@ -24,7 +24,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
return newObj
}

override fun mapUnion(record: JsonNode?, schema: ObjectNode): JsonNode? {
override fun mapUnion(record: JsonNode?, schema: ObjectNode): ObjectNode? {
if (record == null || record.isNull) {
return null
}
Expand All @@ -35,7 +35,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
return mapCommon(record, matchingOption)
}

override fun mapCombined(record: JsonNode?, schema: ObjectNode): JsonNode? {
override fun mapCombined(record: JsonNode?, schema: ObjectNode): ObjectNode? {
if (record == null || record.isNull) {
return null
}
Expand Down
Loading

0 comments on commit 06479a1

Please sign in to comment.