/*
 * Copyright Terracotta, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.ehcache.clustered.common.internal.messages;

import org.ehcache.clustered.common.Consistency;
import org.ehcache.clustered.common.PoolAllocation;
import org.ehcache.clustered.common.ServerSideConfiguration;
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
import org.terracotta.runnel.EnumMapping;
import org.terracotta.runnel.Struct;
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.Enm;
import org.terracotta.runnel.decoding.PrimitiveDecodingSupport;
import org.terracotta.runnel.decoding.StructArrayDecoder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.PrimitiveEncodingSupport;
import org.terracotta.runnel.encoding.StructArrayEncoder;
import org.terracotta.runnel.encoding.StructEncoder;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.terracotta.runnel.EnumMappingBuilder.newEnumMappingBuilder;
import static org.terracotta.runnel.StructBuilder.newStructBuilder;

Encodes and decodes configuration objects such as ServerSideConfiguration and ServerStoreConfiguration.

This class is made extensible and hence must remain public.

/** * Encodes and decodes configuration objects such as {@link ServerSideConfiguration} and {@link ServerStoreConfiguration}. * <p> * This class is made extensible and hence must remain public. */
@SuppressWarnings("WeakerAccess") public class CommonConfigCodec implements ConfigCodec { private static final String STORE_CONFIG_KEY_TYPE_FIELD = "keyType"; private static final String STORE_CONFIG_KEY_SERIALIZER_TYPE_FIELD = "keySerializerType"; private static final String STORE_CONFIG_VALUE_TYPE_FIELD = "valueType"; private static final String STORE_CONFIG_VALUE_SERIALIZER_TYPE_FIELD = "valueSerializerType"; private static final String STORE_CONFIG_CONSISTENCY_FIELD = "consistency"; public static final String POOL_SIZE_FIELD = "poolSize"; public static final String POOL_RESOURCE_NAME_FIELD = "resourceName"; private static final String DEFAULT_RESOURCE_FIELD = "defaultResource"; private static final String POOLS_SUB_STRUCT = "pools"; private static final String POOL_NAME_FIELD = "poolName"; private static final String LOADER_WRITER_CONFIGURED_FIELD = "loaderWriterConfigured"; private static final String WRITE_BEHIND_CONFIGURED_FIELD = "writeBehindConfigured"; private static final EnumMapping<Consistency> CONSISTENCY_ENUM_MAPPING = newEnumMappingBuilder(Consistency.class) .mapping(Consistency.EVENTUAL, 1) .mapping(Consistency.STRONG, 2) .build(); private static final Struct POOLS_STRUCT = newStructBuilder() .string(POOL_NAME_FIELD, 10) .int64(POOL_SIZE_FIELD, 20) .string(POOL_RESOURCE_NAME_FIELD, 30).build(); @Override public InjectTuple injectServerStoreConfiguration(StructBuilder baseBuilder, final int index) { //this needs to be returned whenever the index for builder is changed, so that //other injecting places get the correct last index for adding structs to codec int lastIndexToReturn = index + 30; final StructBuilder structBuilder = baseBuilder.string(STORE_CONFIG_KEY_TYPE_FIELD, index) .string(STORE_CONFIG_KEY_SERIALIZER_TYPE_FIELD, index + 10) .string(STORE_CONFIG_VALUE_TYPE_FIELD, index + 11) .string(STORE_CONFIG_VALUE_SERIALIZER_TYPE_FIELD, index + 15) .enm(STORE_CONFIG_CONSISTENCY_FIELD, index + 16, CONSISTENCY_ENUM_MAPPING) .bool(LOADER_WRITER_CONFIGURED_FIELD, index + 17) .bool(WRITE_BEHIND_CONFIGURED_FIELD, index + 18) // keep poolsize and resource name last .int64(POOL_SIZE_FIELD, index + 20) .string(POOL_RESOURCE_NAME_FIELD, lastIndexToReturn); return new InjectTuple() { @Override public int getLastIndex() { return lastIndexToReturn; } @Override public StructBuilder getUpdatedBuilder() { return structBuilder; } }; } @Override public InjectTuple injectServerSideConfiguration(StructBuilder baseBuilder, final int index) { final StructBuilder structBuilder = baseBuilder.string(DEFAULT_RESOURCE_FIELD, index + 10) .structs(POOLS_SUB_STRUCT, index + 20, POOLS_STRUCT); return new InjectTuple() { @Override public int getLastIndex() { return index + 20; } @Override public StructBuilder getUpdatedBuilder() { return structBuilder; } }; } @Override public void encodeServerStoreConfiguration(PrimitiveEncodingSupport<?> encoder, ServerStoreConfiguration configuration) { encoder.string(STORE_CONFIG_KEY_TYPE_FIELD, configuration.getStoredKeyType()) .string(STORE_CONFIG_KEY_SERIALIZER_TYPE_FIELD, configuration.getKeySerializerType()) .string(STORE_CONFIG_VALUE_TYPE_FIELD, configuration.getStoredValueType()) .string(STORE_CONFIG_VALUE_SERIALIZER_TYPE_FIELD, configuration.getValueSerializerType()); if (configuration.getConsistency() != null) { encoder.enm(STORE_CONFIG_CONSISTENCY_FIELD, configuration.getConsistency()); } encoder.bool(LOADER_WRITER_CONFIGURED_FIELD, configuration.isLoaderWriterConfigured()); encoder.bool(WRITE_BEHIND_CONFIGURED_FIELD, configuration.isWriteBehindConfigured()); PoolAllocation poolAllocation = configuration.getPoolAllocation(); if (poolAllocation instanceof PoolAllocation.Dedicated) { PoolAllocation.Dedicated dedicatedPool = (PoolAllocation.Dedicated) poolAllocation; encoder.int64(POOL_SIZE_FIELD, dedicatedPool.getSize()); if (dedicatedPool.getResourceName() != null) { encoder.string(POOL_RESOURCE_NAME_FIELD, dedicatedPool.getResourceName()); } } else if (poolAllocation instanceof PoolAllocation.Shared) { encoder.string(POOL_RESOURCE_NAME_FIELD, ((PoolAllocation.Shared) poolAllocation).getResourcePoolName()); } } @Override public ServerStoreConfiguration decodeServerStoreConfiguration(PrimitiveDecodingSupport decoder) { String keyType = decoder.string(STORE_CONFIG_KEY_TYPE_FIELD); String keySerializer = decoder.string(STORE_CONFIG_KEY_SERIALIZER_TYPE_FIELD); String valueType = decoder.string(STORE_CONFIG_VALUE_TYPE_FIELD); String valueSerializer = decoder.string(STORE_CONFIG_VALUE_SERIALIZER_TYPE_FIELD); Enm<Consistency> consistencyEnm = decoder.enm(STORE_CONFIG_CONSISTENCY_FIELD); Consistency consistency = Consistency.EVENTUAL; if (consistencyEnm.isValid()) { consistency = consistencyEnm.get(); } Boolean loaderWriterConfigured = decoder.bool(LOADER_WRITER_CONFIGURED_FIELD); Boolean writeBehindConfigured = decoder.bool(WRITE_BEHIND_CONFIGURED_FIELD); Long poolSize = decoder.int64(POOL_SIZE_FIELD); String poolResource = decoder.string(POOL_RESOURCE_NAME_FIELD); PoolAllocation poolAllocation = new PoolAllocation.Unknown(); if (poolSize != null) { poolAllocation = new PoolAllocation.Dedicated(poolResource, poolSize); } else if (poolResource != null) { poolAllocation = new PoolAllocation.Shared(poolResource); } return new ServerStoreConfiguration(poolAllocation, keyType, valueType, keySerializer, valueSerializer, consistency, getNonNullBoolean(loaderWriterConfigured), getNonNullBoolean(writeBehindConfigured)); } private static Boolean getNonNullBoolean(Boolean loaderWriterConfigured) { return Optional.ofNullable(loaderWriterConfigured).orElse(false); } @Override public void encodeServerSideConfiguration(StructEncoder<?> encoder, ServerSideConfiguration configuration) { if (configuration.getDefaultServerResource() != null) { encoder.string(DEFAULT_RESOURCE_FIELD, configuration.getDefaultServerResource()); } if (!configuration.getResourcePools().isEmpty()) { StructArrayEncoder<? extends StructEncoder<?>> poolsEncoder = encoder.structs(POOLS_SUB_STRUCT); for (Map.Entry<String, ServerSideConfiguration.Pool> poolEntry : configuration.getResourcePools().entrySet()) { StructEncoder<?> poolEncoder = poolsEncoder.add(); poolEncoder.string(POOL_NAME_FIELD, poolEntry.getKey()) .int64(POOL_SIZE_FIELD, poolEntry.getValue().getSize()); if (poolEntry.getValue().getServerResource() != null) { poolEncoder.string(POOL_RESOURCE_NAME_FIELD, poolEntry.getValue().getServerResource()); } poolEncoder.end(); } poolsEncoder.end(); } } @Override public ServerSideConfiguration decodeServerSideConfiguration(StructDecoder<?> decoder) { String defaultResource = decoder.string(DEFAULT_RESOURCE_FIELD); HashMap<String, ServerSideConfiguration.Pool> resourcePools = new HashMap<>(); StructArrayDecoder<? extends StructDecoder<?>> poolsDecoder = decoder.structs(POOLS_SUB_STRUCT); if (poolsDecoder != null) { for (int i = 0; i < poolsDecoder.length(); i++) { StructDecoder<?> poolDecoder = poolsDecoder.next(); String poolName = poolDecoder.string(POOL_NAME_FIELD); Long poolSize = poolDecoder.int64(POOL_SIZE_FIELD); String poolResourceName = poolDecoder.string(POOL_RESOURCE_NAME_FIELD); if (poolResourceName == null) { resourcePools.put(poolName, new ServerSideConfiguration.Pool(poolSize)); } else { resourcePools.put(poolName, new ServerSideConfiguration.Pool(poolSize, poolResourceName)); } poolDecoder.end(); } } ServerSideConfiguration serverSideConfiguration; if (defaultResource == null) { serverSideConfiguration = new ServerSideConfiguration(resourcePools); } else { serverSideConfiguration = new ServerSideConfiguration(defaultResource, resourcePools); } return serverSideConfiguration; } }