/*
 * Copyright (C) 2016, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.ketch;

import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;

import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.time.MonotonicClock;
import org.eclipse.jgit.util.time.MonotonicSystemClock;
import org.eclipse.jgit.util.time.ProposedTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Ketch system-wide configuration.

This class provides useful defaults for testing and small proof of concepts. Full scale installations are expected to subclass and override methods to provide consistent configuration across all managed repositories.

Servers should configure their own ScheduledExecutorService.

/** * Ketch system-wide configuration. * <p> * This class provides useful defaults for testing and small proof of concepts. * Full scale installations are expected to subclass and override methods to * provide consistent configuration across all managed repositories. * <p> * Servers should configure their own * {@link java.util.concurrent.ScheduledExecutorService}. */
public class KetchSystem { private static final Random RNG = new Random();
Get default executor, one thread per available processor.
Returns:default executor, one thread per available processor.
/** * Get default executor, one thread per available processor. * * @return default executor, one thread per available processor. */
public static ScheduledExecutorService defaultExecutor() { return DefaultExecutorHolder.I; } private final ScheduledExecutorService executor; private final MonotonicClock clock; private final String txnNamespace; private final String txnAccepted; private final String txnCommitted; private final String txnStage;
Create a default system with a thread pool of 1 thread per CPU.
/** * Create a default system with a thread pool of 1 thread per CPU. */
public KetchSystem() { this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE); }
Create a Ketch system with the provided executor service.
Params:
  • executor – thread pool to run background operations.
  • clock – clock to create timestamps.
  • txnNamespace – reference namespace for the RefTree graph and associated transaction state. Must begin with "refs/" and end with '/', for example "refs/txn/".
/** * Create a Ketch system with the provided executor service. * * @param executor * thread pool to run background operations. * @param clock * clock to create timestamps. * @param txnNamespace * reference namespace for the RefTree graph and associated * transaction state. Must begin with {@code "refs/"} and end * with {@code '/'}, for example {@code "refs/txn/"}. */
public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock, String txnNamespace) { this.executor = executor; this.clock = clock; this.txnNamespace = txnNamespace; this.txnAccepted = txnNamespace + ACCEPTED; this.txnCommitted = txnNamespace + COMMITTED; this.txnStage = txnNamespace + STAGE; }
Get executor to perform background operations.
Returns:executor to perform background operations.
/** * Get executor to perform background operations. * * @return executor to perform background operations. */
public ScheduledExecutorService getExecutor() { return executor; }
Get clock to obtain timestamps from.
Returns:clock to obtain timestamps from.
/** * Get clock to obtain timestamps from. * * @return clock to obtain timestamps from. */
public MonotonicClock getClock() { return clock; }
Get how long the leader will wait for the getClock()'s ProposedTimestamp used in commits proposed to the RefTree graph (getTxnAccepted())
Returns:how long the leader will wait for the getClock()'s ProposedTimestamp used in commits proposed to the RefTree graph (getTxnAccepted()). Defaults to 5 seconds.
/** * Get how long the leader will wait for the {@link #getClock()}'s * {@code ProposedTimestamp} used in commits proposed to the RefTree graph * ({@link #getTxnAccepted()}) * * @return how long the leader will wait for the {@link #getClock()}'s * {@code ProposedTimestamp} used in commits proposed to the RefTree * graph ({@link #getTxnAccepted()}). Defaults to 5 seconds. */
public Duration getMaxWaitForMonotonicClock() { return Duration.ofSeconds(5); }
Whether elections should require monotonically increasing commit timestamps
Returns:true if elections should require monotonically increasing commit timestamps. This requires a very good MonotonicClock.
/** * Whether elections should require monotonically increasing commit * timestamps * * @return {@code true} if elections should require monotonically increasing * commit timestamps. This requires a very good * {@link org.eclipse.jgit.util.time.MonotonicClock}. */
public boolean requireMonotonicLeaderElections() { return false; }
Get the namespace used for the RefTree graph and transaction management.
Returns:reference namespace such as "refs/txn/".
/** * Get the namespace used for the RefTree graph and transaction management. * * @return reference namespace such as {@code "refs/txn/"}. */
public String getTxnNamespace() { return txnNamespace; }
Get name of the accepted RefTree graph.
Returns:name of the accepted RefTree graph.
/** * Get name of the accepted RefTree graph. * * @return name of the accepted RefTree graph. */
public String getTxnAccepted() { return txnAccepted; }
Get name of the committed RefTree graph.
Returns:name of the committed RefTree graph.
/** * Get name of the committed RefTree graph. * * @return name of the committed RefTree graph. */
public String getTxnCommitted() { return txnCommitted; }
Get prefix for staged objects, e.g. "refs/txn/stage/".
Returns:prefix for staged objects, e.g. "refs/txn/stage/".
/** * Get prefix for staged objects, e.g. {@code "refs/txn/stage/"}. * * @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}. */
public String getTxnStage() { return txnStage; }
Create new committer PersonIdent for ketch system
Params:
  • time – timestamp for the committer.
Returns:identity line for the committer header of a RefTreeGraph.
/** * Create new committer {@code PersonIdent} for ketch system * * @param time * timestamp for the committer. * @return identity line for the committer header of a RefTreeGraph. */
public PersonIdent newCommitter(ProposedTimestamp time) { String name = "ketch"; //$NON-NLS-1$ String email = "ketch@system"; //$NON-NLS-1$ return new PersonIdent(name, email, time); }
Construct a random tag to identify a candidate during leader election.

Multiple processes trying to elect themselves leaders at exactly the same time (rounded to seconds) using the same newCommitter(ProposedTimestamp) identity strings, for the same term, may generate the same ObjectId for the election commit and falsely assume they have both won.

Candidates add this tag to their election ballot commit to disambiguate the election. The tag only needs to be unique for a given triplet of newCommitter(ProposedTimestamp), system time (rounded to seconds), and term. If every replica in the system uses a unique newCommitter (such as including the host name after the "@" in the email address) the tag could be the empty string.

The default implementation generates a few bytes of random data.

Returns:unique tag; null or empty string if newCommitter() is sufficiently unique to identify the leader.
/** * Construct a random tag to identify a candidate during leader election. * <p> * Multiple processes trying to elect themselves leaders at exactly the same * time (rounded to seconds) using the same * {@link #newCommitter(ProposedTimestamp)} identity strings, for the same * term, may generate the same ObjectId for the election commit and falsely * assume they have both won. * <p> * Candidates add this tag to their election ballot commit to disambiguate * the election. The tag only needs to be unique for a given triplet of * {@link #newCommitter(ProposedTimestamp)}, system time (rounded to * seconds), and term. If every replica in the system uses a unique * {@code newCommitter} (such as including the host name after the * {@code "@"} in the email address) the tag could be the empty string. * <p> * The default implementation generates a few bytes of random data. * * @return unique tag; null or empty string if {@code newCommitter()} is * sufficiently unique to identify the leader. */
@Nullable public String newLeaderTag() { int n = RNG.nextInt(1 << (6 * 4)); return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$ }
Construct the KetchLeader instance of a repository.
Params:
  • repo – local repository stored by the leader.
Throws:
Returns:leader instance.
/** * Construct the KetchLeader instance of a repository. * * @param repo * local repository stored by the leader. * @return leader instance. * @throws java.net.URISyntaxException * a follower configuration contains an unsupported URI. */
public KetchLeader createLeader(Repository repo) throws URISyntaxException { KetchLeader leader = new KetchLeader(this) { @Override protected Repository openRepository() { repo.incrementOpen(); return repo; } }; leader.setReplicas(createReplicas(leader, repo)); return leader; }
Get the collection of replicas for a repository.

The collection of replicas must include the local repository.

Params:
  • leader – the leader driving these replicas.
  • repo – repository to get the replicas of.
Throws:
Returns:collection of replicas for the specified repository.
/** * Get the collection of replicas for a repository. * <p> * The collection of replicas must include the local repository. * * @param leader * the leader driving these replicas. * @param repo * repository to get the replicas of. * @return collection of replicas for the specified repository. * @throws java.net.URISyntaxException * a configured URI is invalid. */
protected List<KetchReplica> createReplicas(KetchLeader leader, Repository repo) throws URISyntaxException { List<KetchReplica> replicas = new ArrayList<>(); Config cfg = repo.getConfig(); String localName = getLocalName(cfg); for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) { if (!hasParticipation(cfg, name)) { continue; } ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name); if (name.equals(localName)) { replicas.add(new LocalReplica(leader, name, kc)); continue; } RemoteConfig rc = new RemoteConfig(cfg, name); List<URIish> uris = rc.getPushURIs(); if (uris.isEmpty()) { uris = rc.getURIs(); } for (URIish uri : uris) { String n = uris.size() == 1 ? name : uri.getHost(); replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc)); } } return replicas; } private static boolean hasParticipation(Config cfg, String name) { return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null; } private static String getLocalName(Config cfg) { return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME); } static class DefaultExecutorHolder { private static final Logger log = LoggerFactory.getLogger(KetchSystem.class); static final ScheduledExecutorService I = create(); private static ScheduledExecutorService create() { int cores = Runtime.getRuntime().availableProcessors(); int threads = Math.max(5, cores); log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$ return Executors.newScheduledThreadPool( threads, new ThreadFactory() { private final AtomicInteger threadCnt = new AtomicInteger(); @Override public Thread newThread(Runnable r) { int id = threadCnt.incrementAndGet(); Thread thr = new Thread(r); thr.setName("KetchExecutor-" + id); //$NON-NLS-1$ return thr; } }); } private DefaultExecutorHolder() { } }
Compute a delay in a min..max interval with random jitter.
Params:
  • last – amount of delay waited before the last attempt. This is used to seed the next delay interval. Should be 0 if there was no prior delay.
  • min – shortest amount of allowable delay between attempts.
  • max – longest amount of allowable delay between attempts.
Returns:new amount of delay to wait before the next attempt.
/** * Compute a delay in a {@code min..max} interval with random jitter. * * @param last * amount of delay waited before the last attempt. This is used * to seed the next delay interval. Should be 0 if there was no * prior delay. * @param min * shortest amount of allowable delay between attempts. * @param max * longest amount of allowable delay between attempts. * @return new amount of delay to wait before the next attempt. */
static long delay(long last, long min, long max) { long r = Math.max(0, last * 3 - min); if (r > 0) { int c = (int) Math.min(r + 1, Integer.MAX_VALUE); r = RNG.nextInt(c); } return Math.max(Math.min(min + r, max), min); } }