Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.optimizing.dra.DynamicAllocationConfig;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
Expand Down Expand Up @@ -840,19 +841,7 @@ public int compareTo(@NotNull Delayed o) {
}

public int getMinParallelism(ResourceGroup resourceGroup) {
if (!resourceGroup
.getProperties()
.containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM)) {
return 0;
}
String minParallelism =
resourceGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM);
try {
return Integer.parseInt(minParallelism);
} catch (Throwable t) {
LOG.warn("Illegal minParallelism : {}, will use default value 0", minParallelism, t);
return 0;
}
return DynamicAllocationConfig.resolveMinParallelism(resourceGroup);
}

public int tryKeeping(ResourceGroup resourceGroup) {
Expand Down Expand Up @@ -921,6 +910,17 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {

if (keepingTask.getAttempts() > groupMaxKeepingAttempts) {
int minParallelism = keepingTask.getMinParallelism(resourceGroup);
if (DynamicAllocationConfig.isEffectivelyEnabled(resourceGroup)) {
// Dynamic allocation owns scale decisions for the group; never erode its
// min-parallelism floor automatically.
LOG.warn(
"Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}; dynamic allocation is enabled so min-parallel is kept",
resourceGroup.getName(),
keepingTask.getAttempts(),
minParallelism);
keepInTouch(resourceGroup.getName(), 1);
return;
}
LOG.warn(
"Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}, will reset min-parallel to {}",
resourceGroup.getName(),
Expand All @@ -930,7 +930,7 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {
resourceGroup
.getProperties()
.put(
OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM,
DynamicAllocationConfig.effectiveMinParallelismKey(resourceGroup),
String.valueOf(minParallelism - requiredCores));
updateResourceGroup(resourceGroup);
optimizerManager.updateResourceGroup(resourceGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.amoro.server.dashboard.utils.PropertiesUtil;
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.dra.DynamicAllocationConfig;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.OptimizerInstance;
Expand Down Expand Up @@ -242,7 +243,9 @@ public void createResourceGroup(Context ctx) {
validateGroupName(name);
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.createResourceGroup(builder.build());
ResourceGroup resourceGroup = builder.build();
validateDynamicAllocation(resourceGroup);
optimizerManager.createResourceGroup(resourceGroup);
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
}

Expand All @@ -257,7 +260,9 @@ public void updateResourceGroup(Context ctx) {
Map<String, String> properties = PropertiesUtil.sanitizeProperties((Map) map.get("properties"));
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.updateResourceGroup(builder.build());
ResourceGroup resourceGroup = builder.build();
validateDynamicAllocation(resourceGroup);
optimizerManager.updateResourceGroup(resourceGroup);
ctx.json(OkResponse.of("The optimizer group has been successfully updated."));
}

Expand All @@ -283,6 +288,15 @@ public void getContainers(Context ctx) {
.collect(Collectors.toList())));
}

private void validateDynamicAllocation(ResourceGroup resourceGroup) {
DynamicAllocationConfig.warnDeprecatedMinParallelism(resourceGroup);
try {
DynamicAllocationConfig.parse(resourceGroup).validate();
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage());
}
}

private void validateGroupName(String groupName) {
if (StringUtils.isEmpty(groupName)) {
throw new BadRequestException(
Expand Down
Loading
Loading