Skip to content

Commit 239a095

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents eca0020 + 1755268 commit 239a095

File tree

56 files changed

+1750
-1825
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1750
-1825
lines changed

samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.samza.metadatastore;
2020

2121
import org.apache.samza.annotation.InterfaceStability;
22-
import org.apache.samza.config.Config;
23-
import org.apache.samza.metrics.MetricsRegistry;
2422
import java.util.Map;
2523

2624
/**
@@ -32,10 +30,8 @@ public interface MetadataStore {
3230
/**
3331
* Initializes the metadata store, if applicable, setting up the underlying resources
3432
* and connections to the store endpoints.
35-
*
36-
* @param config the configuration for instantiating the MetadataStore.
3733
*/
38-
void init(Config config, MetricsRegistry metricsRegistry);
34+
void init();
3935

4036
/**
4137
* Gets the value associated with the specified {@code key}.

samza-core/src/main/java/org/apache/samza/container/LocalityManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public LocalityManager(Config config, MetricsRegistry metricsRegistry) {
7878
this.config = config;
7979
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
8080
this.metadataStore = metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config, metricsRegistry);
81-
this.metadataStore.init(config, metricsRegistry);
81+
this.metadataStore.init();
8282
this.keySerde = keySerde;
8383
this.valueSerde = valueSerde;
8484
this.taskAssignmentManager = new TaskAssignmentManager(config, metricsRegistry, keySerde, valueSerde);

samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public Set<ContainerModel> group(Set<TaskModel> tasks) {
7878
// Convert to a Set of ContainerModel
7979
Set<ContainerModel> containerModels = new HashSet<>();
8080
for (int i = 0; i < containerCount; i++) {
81-
containerModels.add(new ContainerModel(String.valueOf(i), i, taskGroups[i]));
81+
containerModels.add(new ContainerModel(String.valueOf(i), taskGroups[i]));
8282
}
8383

8484
return Collections.unmodifiableSet(containerModels);
@@ -194,7 +194,7 @@ private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignme
194194
private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
195195
for (ContainerModel container : containers) {
196196
for (TaskName taskName : container.getTasks().keySet()) {
197-
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getProcessorId());
197+
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getId());
198198
}
199199
}
200200
}
@@ -301,7 +301,7 @@ private Set<ContainerModel> buildContainerModels(Set<TaskModel> tasks, List<Task
301301
containerTaskModels.put(model.getTaskName(), model);
302302
}
303303
containerModels.add(
304-
new ContainerModel(container.containerId, Integer.valueOf(container.containerId), containerTaskModels));
304+
new ContainerModel(container.containerId, containerTaskModels));
305305
}
306306
return Collections.unmodifiableSet(containerModels);
307307
}

samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,7 @@ public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersId
100100
// Convert to a Set of ContainerModel
101101
Set<ContainerModel> containerModels = new HashSet<>();
102102
for (int i = 0; i < containerCount; i++) {
103-
// containerId in ContainerModel constructor is set to -1 because processorId can be any string and does
104-
// not have an integer equivalent. So, we set it to -1. After 0.13, this parameter will be removed.
105-
containerModels.add(new ContainerModel(containersIds.get(i), -1, taskGroups[i]));
103+
containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
106104
}
107105

108106
return Collections.unmodifiableSet(containerModels);

samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public Set<ContainerModel> group(Set<TaskModel> taskModels) {
5050
for (TaskModel taskModel: taskModels) {
5151
taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
5252
}
53-
ContainerModel containerModel = new ContainerModel(containerId, -1, taskNameTaskModelMap);
53+
ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap);
5454
return Collections.singleton(containerModel);
5555
}
5656
}

samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Ser
8484
}
8585

8686
public void init(Config config, MetricsRegistry metricsRegistry) {
87-
this.metadataStore.init(config, metricsRegistry);
87+
this.metadataStore.init();
8888
}
8989

9090
/**

samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public CoordinatorStreamStore(String namespace, Config config, MetricsRegistry m
8989
}
9090

9191
@Override
92-
public void init(Config config, MetricsRegistry metricsRegistry) {
92+
public void init() {
9393
if (isInitialized.compareAndSet(false, true)) {
9494
LOG.info("Starting the coordinator stream system consumer with config: {}.", config);
9595
registerConsumer();

samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
package org.apache.samza.job.model;
2121

22-
import org.apache.samza.container.TaskName;
23-
2422
import java.util.Collections;
2523
import java.util.Map;
24+
import org.apache.samza.container.TaskName;
2625

2726
/**
2827
* <p>
@@ -41,28 +40,16 @@
4140
* </p>
4241
*/
4342
public class ContainerModel {
44-
@Deprecated
45-
private final int containerId;
46-
private final String processorId;
43+
private final String id;
4744
private final Map<TaskName, TaskModel> tasks;
4845

49-
public ContainerModel(String processorId, int containerId, Map<TaskName, TaskModel> tasks) {
50-
this.containerId = containerId;
51-
if (processorId == null) {
52-
this.processorId = String.valueOf(containerId);
53-
} else {
54-
this.processorId = processorId;
55-
}
46+
public ContainerModel(String id, Map<TaskName, TaskModel> tasks) {
47+
this.id = id;
5648
this.tasks = Collections.unmodifiableMap(tasks);
5749
}
5850

59-
@Deprecated
60-
public int getContainerId() {
61-
return containerId;
62-
}
63-
64-
public String getProcessorId() {
65-
return processorId;
51+
public String getId() {
52+
return id;
6653
}
6754

6855
public Map<TaskName, TaskModel> getTasks() {
@@ -71,14 +58,14 @@ public Map<TaskName, TaskModel> getTasks() {
7158

7259
@Override
7360
public String toString() {
74-
return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + "]";
61+
return "ContainerModel [id=" + id + ", tasks=" + tasks + "]";
7562
}
7663

7764
@Override
7865
public int hashCode() {
7966
final int prime = 31;
8067
int result = 1;
81-
result = prime * result + ((processorId == null) ? 0 : processorId.hashCode());
68+
result = prime * result + ((id == null) ? 0 : id.hashCode());
8269
result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
8370
return result;
8471
}
@@ -92,7 +79,7 @@ public boolean equals(Object obj) {
9279
if (getClass() != obj.getClass())
9380
return false;
9481
ContainerModel other = (ContainerModel) obj;
95-
if (!processorId.equals(other.processorId))
82+
if (!id.equals(other.id))
9683
return false;
9784
if (tasks == null) {
9885
if (other.tasks != null)

samza-core/src/main/java/org/apache/samza/job/model/JobModel.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.samza.config.Config;
2626
import org.apache.samza.container.LocalityManager;
2727
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
28-
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
2928

3029
/**
3130
* <p>
@@ -39,7 +38,6 @@
3938
* an id, partition information, etc.
4039
* </p>
4140
*/
42-
@JsonIgnoreProperties(ignoreUnknown = true)
4341
public class JobModel {
4442
private static final String EMPTY_STRING = "";
4543
private final Config config;

samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,35 @@
1919

2020
package org.apache.samza.serializers.model;
2121

22+
import java.util.Map;
2223
import org.apache.samza.container.TaskName;
2324
import org.apache.samza.job.model.TaskModel;
24-
import org.codehaus.jackson.annotate.JsonCreator;
2525
import org.codehaus.jackson.annotate.JsonProperty;
2626

27-
import java.util.Map;
28-
2927
/**
30-
* A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
28+
* A mix-in Jackson class to convert {@link org.apache.samza.job.model.ContainerModel} to JSON.
29+
* Notes:
30+
* 1) Constructor is not needed because this mixin is not used for deserialization. See {@link SamzaObjectMapper}.
31+
* 2) It is unnecessary to use {@link org.codehaus.jackson.annotate.JsonIgnoreProperties#ignoreUnknown()} here since
32+
* {@link SamzaObjectMapper} already uses custom deserialization code for the
33+
* {@link org.apache.samza.job.model.ContainerModel}.
34+
* 3) See {@link SamzaObjectMapper} for more context about why the JSON keys are named in this specified way.
3135
*/
3236
public abstract class JsonContainerModelMixIn {
33-
@JsonCreator
34-
public JsonContainerModelMixIn(@JsonProperty("processor-id") String processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
35-
}
36-
37-
@Deprecated
38-
@JsonProperty("container-id")
39-
abstract int getContainerId();
40-
41-
@JsonProperty("processor-id")
42-
abstract String getProcessorId();
43-
44-
@JsonProperty("tasks")
37+
/**
38+
* This is intentionally not "id" for backwards compatibility reasons. See {@link SamzaObjectMapper} for more details.
39+
*/
40+
static final String PROCESSOR_ID_KEY = "processor-id";
41+
/**
42+
* This is used for backwards compatibility. See {@link SamzaObjectMapper} for more details.
43+
*/
44+
static final String CONTAINER_ID_KEY = "container-id";
45+
static final String TASKS_KEY = "tasks";
46+
47+
@JsonProperty(PROCESSOR_ID_KEY)
48+
abstract String getId();
49+
50+
@JsonProperty(TASKS_KEY)
4551
abstract Map<TaskName, TaskModel> getTasks();
4652
}
4753

0 commit comments

Comments
 (0)