/*
 * Copyright (C) 2016, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.ketch;

import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.internal.storage.reftree.Command;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.PushCertificate;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.util.time.ProposedTimestamp;

A proposal to be applied in a Ketch system.

Pushing to a Ketch leader results in the leader making a proposal. The proposal includes the list of reference updates. The leader attempts to send the proposal to a quorum of replicas by pushing the proposal to a "staging" area under the refs/txn/stage/ namespace. If the proposal succeeds then the changes are durable and the leader can commit the proposal.

Proposals are executed by KetchLeader.queueProposal(Proposal), which runs them asynchronously in the background. Proposals are thread-safe futures allowing callers to await() for results or be notified by callback using addListener(Runnable).

/** * A proposal to be applied in a Ketch system. * <p> * Pushing to a Ketch leader results in the leader making a proposal. The * proposal includes the list of reference updates. The leader attempts to send * the proposal to a quorum of replicas by pushing the proposal to a "staging" * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds * then the changes are durable and the leader can commit the proposal. * <p> * Proposals are executed by * {@link org.eclipse.jgit.internal.ketch.KetchLeader#queueProposal(Proposal)}, * which runs them asynchronously in the background. Proposals are thread-safe * futures allowing callers to {@link #await()} for results or be notified by * callback using {@link #addListener(Runnable)}. */
public class Proposal {
Current state of the proposal.
/** Current state of the proposal. */
public enum State {
Proposal has not yet been given to a KetchLeader.
/** Proposal has not yet been given to a {@link KetchLeader}. */
NEW(false),
Proposal was validated and has entered the queue, but a round containing this proposal has not started yet.
/** * Proposal was validated and has entered the queue, but a round * containing this proposal has not started yet. */
QUEUED(false),
Round containing the proposal has begun and is in progress.
/** Round containing the proposal has begun and is in progress. */
RUNNING(false),
Proposal was executed through a round. Individual results from Proposal.getCommands(), Command.getResult() explain the success or failure outcome.
/** * Proposal was executed through a round. Individual results from * {@link Proposal#getCommands()}, {@link Command#getResult()} explain * the success or failure outcome. */
EXECUTED(true),
Proposal was aborted and did not reach consensus.
/** Proposal was aborted and did not reach consensus. */
ABORTED(true); private final boolean done; private State(boolean done) { this.done = done; }
Returns:true if this is a terminal state.
/** @return true if this is a terminal state. */
public boolean isDone() { return done; } } private final List<Command> commands; private PersonIdent author; private String message; private PushCertificate pushCert; private List<ProposedTimestamp> timestamps; private final List<Runnable> listeners = new CopyOnWriteArrayList<>(); private final AtomicReference<State> state = new AtomicReference<>(NEW);
Create a proposal from a list of Ketch commands.
Params:
  • cmds – prepared list of commands.
/** * Create a proposal from a list of Ketch commands. * * @param cmds * prepared list of commands. */
public Proposal(List<Command> cmds) { commands = Collections.unmodifiableList(new ArrayList<>(cmds)); }
Create a proposal from a collection of received commands.
Params:
  • rw – walker to assist in preparing commands.
  • cmds – list of pending commands.
Throws:
/** * Create a proposal from a collection of received commands. * * @param rw * walker to assist in preparing commands. * @param cmds * list of pending commands. * @throws org.eclipse.jgit.errors.MissingObjectException * newId of a command is not found locally. * @throws java.io.IOException * local objects cannot be accessed. */
public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds) throws MissingObjectException, IOException { commands = asCommandList(rw, cmds); } private static List<Command> asCommandList(RevWalk rw, Collection<ReceiveCommand> cmds) throws MissingObjectException, IOException { List<Command> commands = new ArrayList<>(cmds.size()); for (ReceiveCommand cmd : cmds) { commands.add(new Command(rw, cmd)); } return Collections.unmodifiableList(commands); }
Get commands from this proposal.
Returns:commands from this proposal.
/** * Get commands from this proposal. * * @return commands from this proposal. */
public Collection<Command> getCommands() { return commands; }
Get optional author of the proposal.
Returns:optional author of the proposal.
/** * Get optional author of the proposal. * * @return optional author of the proposal. */
@Nullable public PersonIdent getAuthor() { return author; }
Set the author for the proposal.
Params:
  • who – optional identity of the author of the proposal.
Returns:this
/** * Set the author for the proposal. * * @param who * optional identity of the author of the proposal. * @return {@code this} */
public Proposal setAuthor(@Nullable PersonIdent who) { author = who; return this; }
Get optional message for the commit log of the RefTree.
Returns:optional message for the commit log of the RefTree.
/** * Get optional message for the commit log of the RefTree. * * @return optional message for the commit log of the RefTree. */
@Nullable public String getMessage() { return message; }
Set the message to appear in the commit log of the RefTree.
Params:
  • msg – message text for the commit.
Returns:this
/** * Set the message to appear in the commit log of the RefTree. * * @param msg * message text for the commit. * @return {@code this} */
public Proposal setMessage(@Nullable String msg) { message = msg != null && !msg.isEmpty() ? msg : null; return this; }
Get optional certificate signing the references.
Returns:optional certificate signing the references.
/** * Get optional certificate signing the references. * * @return optional certificate signing the references. */
@Nullable public PushCertificate getPushCertificate() { return pushCert; }
Set the push certificate signing the references.
Params:
  • cert – certificate, may be null.
Returns:this
/** * Set the push certificate signing the references. * * @param cert * certificate, may be null. * @return {@code this} */
public Proposal setPushCertificate(@Nullable PushCertificate cert) { pushCert = cert; return this; }
Get timestamps that Ketch must block for.
Returns:timestamps that Ketch must block for. These may have been used as commit times inside the objects involved in the proposal.
/** * Get timestamps that Ketch must block for. * * @return timestamps that Ketch must block for. These may have been used as * commit times inside the objects involved in the proposal. */
public List<ProposedTimestamp> getProposedTimestamps() { if (timestamps != null) { return timestamps; } return Collections.emptyList(); }
Request the proposal to wait for the affected timestamps to resolve.
Params:
Returns:this.
/** * Request the proposal to wait for the affected timestamps to resolve. * * @param ts * a {@link org.eclipse.jgit.util.time.ProposedTimestamp} object. * @return {@code this}. */
public Proposal addProposedTimestamp(ProposedTimestamp ts) { if (timestamps == null) { timestamps = new ArrayList<>(4); } timestamps.add(ts); return this; }
Add a callback to be invoked when the proposal is done.

A proposal is done when it has entered either State.EXECUTED or State.ABORTED state. If the proposal is already done callback.run() is immediately invoked on the caller's thread.

Params:
  • callback – method to run after the proposal is done. The callback may be run on a Ketch system thread and should be completed quickly.
/** * Add a callback to be invoked when the proposal is done. * <p> * A proposal is done when it has entered either * {@link org.eclipse.jgit.internal.ketch.Proposal.State#EXECUTED} or * {@link org.eclipse.jgit.internal.ketch.Proposal.State#ABORTED} state. If * the proposal is already done {@code callback.run()} is immediately * invoked on the caller's thread. * * @param callback * method to run after the proposal is done. The callback may be * run on a Ketch system thread and should be completed quickly. */
public void addListener(Runnable callback) { boolean runNow = false; synchronized (state) { if (state.get().isDone()) { runNow = true; } else { listeners.add(callback); } } if (runNow) { callback.run(); } }
Set command result as OK.
/** Set command result as OK. */
void success() { for (Command c : commands) { if (c.getResult() == NOT_ATTEMPTED) { c.setResult(OK); } } notifyState(EXECUTED); }
Mark commands as "transaction aborted".
/** Mark commands as "transaction aborted". */
void abort() { Command.abort(commands, null); notifyState(ABORTED); }
Read the current state of the proposal.
Returns:read the current state of the proposal.
/** * Read the current state of the proposal. * * @return read the current state of the proposal. */
public State getState() { return state.get(); }
Whether the proposal was attempted
Returns:true if the proposal was attempted. A true value does not mean consensus was reached, only that the proposal was considered and will not be making any more progress beyond its current state.
/** * Whether the proposal was attempted * * @return {@code true} if the proposal was attempted. A true value does not * mean consensus was reached, only that the proposal was considered * and will not be making any more progress beyond its current * state. */
public boolean isDone() { return state.get().isDone(); }
Wait for the proposal to be attempted and isDone() to be true.
Throws:
/** * Wait for the proposal to be attempted and {@link #isDone()} to be true. * * @throws java.lang.InterruptedException * caller was interrupted before proposal executed. */
public void await() throws InterruptedException { synchronized (state) { while (!state.get().isDone()) { state.wait(); } } }
Wait for the proposal to be attempted and isDone() to be true.
Params:
  • wait – how long to wait.
  • unit – unit describing the wait time.
Throws:
Returns:true if the proposal is done; false if the method timed out.
/** * Wait for the proposal to be attempted and {@link #isDone()} to be true. * * @param wait * how long to wait. * @param unit * unit describing the wait time. * @return true if the proposal is done; false if the method timed out. * @throws java.lang.InterruptedException * caller was interrupted before proposal executed. */
public boolean await(long wait, TimeUnit unit) throws InterruptedException { synchronized (state) { if (state.get().isDone()) { return true; } state.wait(unit.toMillis(wait)); return state.get().isDone(); } }
Wait for the proposal to exit a state.
Params:
  • notIn – state the proposal should not be in to return.
  • wait – how long to wait.
  • unit – unit describing the wait time.
Throws:
Returns:true if the proposal exited the state; false on time out.
/** * Wait for the proposal to exit a state. * * @param notIn * state the proposal should not be in to return. * @param wait * how long to wait. * @param unit * unit describing the wait time. * @return true if the proposal exited the state; false on time out. * @throws java.lang.InterruptedException * caller was interrupted before proposal executed. */
public boolean awaitStateChange(State notIn, long wait, TimeUnit unit) throws InterruptedException { synchronized (state) { if (state.get() != notIn) { return true; } state.wait(unit.toMillis(wait)); return state.get() != notIn; } } void notifyState(State s) { synchronized (state) { state.set(s); state.notifyAll(); } if (s.isDone()) { for (Runnable callback : listeners) { callback.run(); } listeners.clear(); } }
{@inheritDoc}
/** {@inheritDoc} */
@Override public String toString() { StringBuilder s = new StringBuilder(); s.append("Ketch Proposal {\n"); //$NON-NLS-1$ s.append(" ").append(state.get()).append('\n'); //$NON-NLS-1$ if (author != null) { s.append(" author ").append(author).append('\n'); //$NON-NLS-1$ } if (message != null) { s.append(" message ").append(message).append('\n'); //$NON-NLS-1$ } for (Command c : commands) { s.append(" "); //$NON-NLS-1$ format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$ s.append(' '); format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$ s.append(' ').append(c.getRefName()); if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) { s.append(' ').append(c.getResult()); // $NON-NLS-1$ } s.append('\n'); } s.append('}'); return s.toString(); } private static void format(StringBuilder s, @Nullable Ref r, String n) { if (r == null) { s.append(n); } else if (r.isSymbolic()) { s.append(r.getTarget().getName()); } else { ObjectId id = r.getObjectId(); if (id != null) { s.append(id.abbreviate(8).name()); } } } }