package com.datastax.oss.driver.internal.core.session;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.internal.core.context.EventBus;
import com.datastax.oss.driver.internal.core.metadata.schema.events.AggregateChangeEvent;
import com.datastax.oss.driver.internal.core.metadata.schema.events.FunctionChangeEvent;
import com.datastax.oss.driver.internal.core.metadata.schema.events.KeyspaceChangeEvent;
import com.datastax.oss.driver.internal.core.metadata.schema.events.TableChangeEvent;
import com.datastax.oss.driver.internal.core.metadata.schema.events.TypeChangeEvent;
import com.datastax.oss.driver.internal.core.metadata.schema.events.ViewChangeEvent;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import io.netty.util.concurrent.EventExecutor;
import net.jcip.annotations.ThreadSafe;
@ThreadSafe
class SchemaListenerNotifier {
private final SchemaChangeListener listener;
private final EventExecutor adminExecutor;
SchemaListenerNotifier(
SchemaChangeListener listener, EventBus eventBus, EventExecutor adminExecutor) {
this.listener = listener;
this.adminExecutor = adminExecutor;
eventBus.register(
AggregateChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onAggregateChangeEvent));
eventBus.register(
FunctionChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onFunctionChangeEvent));
eventBus.register(
KeyspaceChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onKeyspaceChangeEvent));
eventBus.register(
TableChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onTableChangeEvent));
eventBus.register(
TypeChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onTypeChangeEvent));
eventBus.register(
ViewChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onViewChangeEvent));
}
private void onAggregateChangeEvent(AggregateChangeEvent event) {
assert adminExecutor.inEventLoop();
switch (event.changeType) {
case CREATED:
listener.onAggregateCreated(event.newAggregate);
break;
case UPDATED:
listener.onAggregateUpdated(event.newAggregate, event.oldAggregate);
break;
case DROPPED:
listener.onAggregateDropped(event.oldAggregate);
break;
}
}
private void onFunctionChangeEvent(FunctionChangeEvent event) {
assert adminExecutor.inEventLoop();
switch (event.changeType) {
case CREATED:
listener.onFunctionCreated(event.newFunction);
break;
case UPDATED:
listener.onFunctionUpdated(event.newFunction, event.oldFunction);
break;
case DROPPED:
listener.onFunctionDropped(event.oldFunction);
break;
}
}
private void onKeyspaceChangeEvent(KeyspaceChangeEvent event) {
assert adminExecutor.inEventLoop();
switch (event.changeType) {
case CREATED:
listener.onKeyspaceCreated(event.newKeyspace);
break;
case UPDATED:
listener.onKeyspaceUpdated(event.newKeyspace, event.oldKeyspace);
break;
case DROPPED:
listener.onKeyspaceDropped(event.oldKeyspace);
break;
}
}
private void onTableChangeEvent(TableChangeEvent event) {
assert adminExecutor.inEventLoop();
switch (event.changeType) {
case CREATED:
listener.onTableCreated(event.newTable);
break;
case UPDATED:
listener.onTableUpdated(event.newTable, event.oldTable);
break;
case DROPPED:
listener.onTableDropped(event.oldTable);
break;
}
}
private void onTypeChangeEvent(TypeChangeEvent event) {
assert adminExecutor.inEventLoop();
switch (event.changeType) {
case CREATED:
listener.onUserDefinedTypeCreated(event.newType);
break;
case UPDATED:
listener.onUserDefinedTypeUpdated(event.newType, event.oldType);
break;
case DROPPED:
listener.onUserDefinedTypeDropped(event.oldType);
break;
}
}
private void onViewChangeEvent(ViewChangeEvent event) {
assert adminExecutor.inEventLoop();
switch (event.changeType) {
case CREATED:
listener.onViewCreated(event.newView);
break;
case UPDATED:
listener.onViewUpdated(event.newView, event.oldView);
break;
case DROPPED:
listener.onViewDropped(event.oldView);
break;
}
}
}