Skip to content

Commit

Permalink
fix violations
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Oct 21, 2022
1 parent 947fb28 commit c72a0df
Show file tree
Hide file tree
Showing 18 changed files with 172 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.bcloader;

import static org.apache.pulsar.common.util.SecurityUtility.BC_FIPS;

import java.security.Provider;
import java.security.Security;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.bcloader;
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@
*/
public class MockitoCleanupListener extends BetweenTestClassesListenerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(MockitoCleanupListener.class);
private static final boolean
MOCKITO_CLEANUP_ENABLED = Boolean.parseBoolean(System.getProperty("testMockitoCleanup", "true"));
private static final boolean MOCKITO_CLEANUP_ENABLED = Boolean.parseBoolean(
System.getProperty("testMockitoCleanup", "true"));

private static final String MOCKITO_CLEANUP_INFO =
"Cleaning up Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state.";

@Override
protected void onBetweenTestClasses(Class<?> endedTestClass, Class<?> startedTestClass) {
if (MOCKITO_CLEANUP_ENABLED) {
try {
if (MockitoThreadLocalStateCleaner.INSTANCE.isEnabled()) {
LOG.info(
"Cleaning up Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state.");
LOG.info(MOCKITO_CLEANUP_INFO);
MockitoThreadLocalStateCleaner.INSTANCE.cleanup();
}
} finally {
Expand All @@ -53,7 +55,7 @@ protected void onBetweenTestClasses(Class<?> endedTestClass, Class<?> startedTes
/**
* Mockito-inline can leak mocked objects, we need to clean up the inline mocks after every test.
* See <a href="https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#47"}>
* mockito docs</a>.
* mockito docs</a>.
*/
private void cleanupMockitoInline() {
Mockito.framework().clearInlineMocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package org.apache.pulsar.jclouds;

import com.google.inject.AbstractModule;
import java.util.ArrayList;
import java.util.List;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.jclouds.ContextBuilder;
import org.jclouds.http.apachehc.config.ApacheHCHttpCommandExecutorServiceModule;
import org.jclouds.http.okhttp.config.OkHttpCommandExecutorServiceModule;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;

import java.util.ArrayList;
import java.util.List;

/**
* This utility class helps in dealing with shaded dependencies (especially Guice).
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.jclouds;
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.client.cli;

import static org.testng.Assert.assertEquals;

import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
Expand All @@ -28,8 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.beust.jcommander.JCommander;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -334,7 +333,12 @@ private void internalFlush(Deque<Record<T>> swapList) throws SQLException {
}
}

private void internalFlushBatch(Deque<Record<T>> swapList, PreparedStatement currentBatch, int count, long start) throws SQLException {
private void internalFlushBatch(
Deque<Record<T>> swapList,
PreparedStatement currentBatch,
int count,
long start
) throws SQLException {
executeBatch(swapList, currentBatch);
if (log.isDebugEnabled()) {
log.debug("Flushed {} messages in {} ms", count, (System.nanoTime() - start) / 1000 / 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ public CompletableFuture<Void> updateMeta(PackageName packageName, PackageMetada
future.completeExceptionally(throwable);
return;
}
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(PackageMetadataUtil.toBytes(metadata))) {
try (ByteArrayInputStream in = new ByteArrayInputStream(PackageMetadataUtil.toBytes(metadata))) {
storage.deleteAsync(metadataPath)
.thenCompose(aVoid -> storage.writeAsync(metadataPath, inputStream))
.thenCompose(aVoid -> storage.writeAsync(metadataPath, in))
.whenComplete((aVoid, t) -> {
if (t != null) {
future.completeExceptionally(new PackagesManagementException(
String.format("Update package '%s' metadata failed", packageName.toString()), t));
String.format("Update package '%s' metadata failed", packageName), t));
} else {
future.complete(null);
}
});
} catch (IOException e) {
future.completeExceptionally(new PackagesManagementException(
String.format("Read package '%s' metadata failed", packageName.toString()), e));
String.format("Read package '%s' metadata failed", packageName), e));
}
});
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.functions;

import java.io.ByteArrayOutputStream;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
Expand All @@ -33,11 +35,8 @@
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

import java.io.ByteArrayOutputStream;
import java.util.stream.Collectors;

/**
* This function removes a "field" from a AVRO message
* This function removes a "field" from a AVRO message.
*/
@Slf4j
public class RemoveAvroFieldFunction implements Function<GenericObject, Void> {
Expand Down Expand Up @@ -70,18 +69,22 @@ public Void process(GenericObject genericObject, Context context) throws Excepti
org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
originalAvroSchema.getName(),
originalAvroSchema.getDoc(),
originalAvroSchema.getNamespace(),
originalAvroSchema.isError(),
originalAvroSchema.getFields().
stream()
.filter(f->!f.name().equals(FIELD_TO_REMOVE))
.map(f-> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
.map(f-> new org.apache.avro.Schema.Field(
f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
.collect(Collectors.toList()));

KeyValue originalObject = (KeyValue) nativeObject;

GenericRecord value = (GenericRecord) originalObject.getValue();
org.apache.avro.generic.GenericRecord genericRecord
= (org.apache.avro.generic.GenericRecord) value.getNativeObject();
org.apache.avro.generic.GenericRecord genericRecord =
(org.apache.avro.generic.GenericRecord) value.getNativeObject();

org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
for (org.apache.avro.Schema.Field field : modified.getFields()) {
Expand All @@ -105,15 +108,19 @@ public Void process(GenericObject genericObject, Context context) throws Excepti
org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
originalAvroSchema.getName(),
originalAvroSchema.getDoc(),
originalAvroSchema.getNamespace(),
originalAvroSchema.isError(),
originalAvroSchema.getFields().
stream()
.filter(f -> !f.name().equals(FIELD_TO_REMOVE))
.map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
.map(f -> new org.apache.avro.Schema.Field(
f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
.collect(Collectors.toList()));

org.apache.avro.generic.GenericRecord genericRecord
= (org.apache.avro.generic.GenericRecord) nativeObject;
org.apache.avro.generic.GenericRecord genericRecord =
(org.apache.avro.generic.GenericRecord) nativeObject;
org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
for (org.apache.avro.Schema.Field field : modified.getFields()) {
newRecord.put(field.name(), genericRecord.get(field.name()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.io;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericObject;
Expand All @@ -28,7 +29,6 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import java.util.Map;

@Slf4j
public class TestGenericObjectSink implements Sink<GenericObject> {
Expand All @@ -49,7 +49,11 @@ public void write(Record<GenericObject> record) {
String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING");
log.info("expectedRecordType {}", expectedRecordType);
if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) {
throw new RuntimeException("Unexpected record type " + record.getSchema().getSchemaInfo().getType().name() + " is not " + expectedRecordType);
final String message = String.format(
"Unexpected record type %s is not %s",
record.getSchema().getSchemaInfo().getType().name(),
expectedRecordType);
throw new RuntimeException(message);
}

log.info("value {}", record.getValue());
Expand All @@ -67,17 +71,21 @@ public void write(Record<GenericObject> record) {
log.info("kvkey {}", keyValue.getKey());
log.info("kvvalue {}", keyValue.getValue());
}
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {} class {}", record.getValue().getNativeObject(), record.getValue().getNativeObject().getClass());

final GenericObject value = record.getValue();
log.info("value {}", value);
log.info("value schema type {}", value.getSchemaType());
log.info("value native object {} class {}", value.getNativeObject(), value.getNativeObject().getClass());

String expectedSchemaDefinition = record.getProperties().getOrDefault("expectedSchemaDefinition", "");
log.info("schemaDefinition {}", record.getSchema().getSchemaInfo().getSchemaDefinition());
log.info("expectedSchemaDefinition {}", expectedSchemaDefinition);
if (!expectedSchemaDefinition.isEmpty()) {
String schemaDefinition = record.getSchema().getSchemaInfo().getSchemaDefinition();
if (!expectedSchemaDefinition.equals(schemaDefinition)) {
throw new RuntimeException("Unexpected schema definition " + schemaDefinition + " is not " + expectedSchemaDefinition);
final String message = String.format(
"Unexpected schema definition %s is not %s", schemaDefinition, expectedSchemaDefinition);
throw new RuntimeException(message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/
package org.apache.pulsar.tests.integration.io;

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

import java.util.HashMap;
import java.util.Map;

public class TestPropertySource implements Source<String> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/
package org.apache.pulsar.tests.integration.io;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

import java.nio.ByteBuffer;
import java.util.Map;

public class TestStateSink implements Sink<String> {

private SinkContext sinkContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
*/
package org.apache.pulsar.tests.integration.io;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

import java.nio.ByteBuffer;
import java.util.Map;

public class TestStateSource implements Source<String> {


private SourceContext sourceContext;
private int count;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;
Loading

0 comments on commit c72a0df

Please sign in to comment.