001package com.pingidentity.ds.plugin;
002
003import com.unboundid.directory.sdk.common.api.ServerThread;
004import com.unboundid.directory.sdk.common.types.LogSeverity;
005import com.unboundid.directory.sdk.common.types.ServerContext;
006import com.unboundid.ldap.sdk.*;
007import com.unboundid.util.FixedRateBarrier;
008
009import java.util.concurrent.atomic.AtomicLong;
010
011/**
012 * This class provides facilities to handle the Snapshot Changelog change number in a consistent and resilient manner
013 */
014public class SnapshotChangeNumberPersister implements ServerThread
015{
016    private final ServerContext serverContext;
017    private final SnapshotChangelog snapshotChanglog;
018    private AtomicLong changeNumber;
019    private static final String STATE_CLASS = "state";
020    private static final Long INITIAL_CHANGE_NUMBER = 0L;
021    private DN baseDN;
022    private FixedRateBarrier barrier = new FixedRateBarrier(1000L,3);
023    private static final String ATTR_STATE_CHANGE_NUMBER = "scl-last-change-number";
024    Long lastPersistedChangeNumber = 0L;
025    
026    /**
027     * Performs the necessary processing to generate the DN of the entry containing the state of the enhanced changelog
028     *
029     * @return a String with the DN of the changelog state entry
030     */
031    private String getStateEntryDN()
032    {
033        return "uid=state," + baseDN;
034    }
035    
036    /**
037     * This constructor either loads the current changelog number value or create a new state entry
038     * @param snapshotChangelog the snapshotchangelog object
039     * @throws LDAPException if an error is encountered
040     */
041    public SnapshotChangeNumberPersister(SnapshotChangelog snapshotChangelog) throws LDAPException
042    {
043        this.snapshotChanglog = snapshotChangelog;
044        this.baseDN=snapshotChangelog.baseDN;
045        this.serverContext = snapshotChangelog.serverContext;
046        SearchResultEntry entry = null;
047
048        Long persistedChangeNumber = INITIAL_CHANGE_NUMBER;
049        try
050        {
051            entry = serverContext.getInternalRootConnection().getEntry(baseDN.toNormalizedString());
052        } catch (LDAPException ldape)
053        {
054            serverContext.debugCaught(ldape);
055        }
056        if (entry == null)
057        {
058            Entry rootEntry = new Entry(baseDN);
059            rootEntry.addAttribute(SnapshotChangelog.OBJECT_CLASS, "javaContainer", "extensibleObject");
060            for (Attribute attribute : baseDN.getRDN().getAttributes())
061            {
062                rootEntry.addAttribute(attribute);
063            }
064            serverContext.getInternalRootConnection().add(rootEntry);
065        }
066    
067        entry = serverContext.getInternalRootConnection().getEntry(getStateEntryDN());
068        if (entry != null)
069        {
070            Long candidateCn = entry.getAttributeValueAsLong(ATTR_STATE_CHANGE_NUMBER);
071            if (candidateCn != null && candidateCn > INITIAL_CHANGE_NUMBER)
072            {
073                persistedChangeNumber = candidateCn;
074            }
075        } else
076        {
077            Entry stateEntry = new Entry(getStateEntryDN());
078            stateEntry.addAttribute(SnapshotChangelog.OBJECT_CLASS, SnapshotChangelog.CHANGELOG_PREFIX + STATE_CLASS);
079            stateEntry.addAttribute(ATTR_STATE_CHANGE_NUMBER, String.valueOf(persistedChangeNumber));
080            serverContext.getInternalRootConnection().add(stateEntry);
081        }
082        this.changeNumber = new AtomicLong(persistedChangeNumber);
083        Long fallForwardNumber = changeNumber.get();
084
085        // Fall-forward: guard against the last persisted change-number to be "behind"
086        Entry fallForwardEntry = null;
087        do
088        {
089            try
090            {
091                fallForwardEntry = serverContext.getInternalRootConnection().getEntry(snapshotChangelog.getChangelogEntryDN(++fallForwardNumber));
092            } catch (LDAPException e)
093            {
094                fallForwardEntry=null;
095                if (ResultCode.NO_SUCH_OBJECT.equals(e.getResultCode()))
096                {
097                    // the last entry was found
098                    if ( changeNumber.get() != fallForwardNumber)
099                    {
100                        changeNumber = new AtomicLong(fallForwardNumber);
101                    }
102                }
103            }
104        } while( fallForwardEntry != null );
105    }
106    
107    /**
108     * Performs the necessary processing to return the current change number
109     * @return the current change number
110     */
111    public Long getCurrentChangeNumber()
112    {
113        return changeNumber.get();
114    }
115    
116    /**
117     * Performs the necessary processing to return sequential change numbers
118     * @return the next change number
119     */
120    public synchronized Long getNextChangeNumber(){
121        return changeNumber.incrementAndGet();
122    }
123    
124    /**
125     * Performs the necessary processing to persist the change number to the configured location
126     */
127    private boolean persist()
128    {
129        Modification m = new Modification(ModificationType.REPLACE, ATTR_STATE_CHANGE_NUMBER, String.valueOf
130                (changeNumber.get()));
131        try
132        {
133            LDAPResult ldapResult = serverContext.getInternalRootConnection().modify(getStateEntryDN(), m);
134            return ResultCode.SUCCESS.equals(ldapResult.getResultCode());
135        } catch (LDAPException e)
136        {
137            StringBuffer message = new StringBuffer();
138            message.append("Changelog state could not be saved for change number ");
139            message.append(changeNumber.get());
140            message.append(" - ");
141            message.append(e.getDiagnosticMessage());
142            serverContext.logMessage(LogSeverity.FATAL_ERROR, message.toString());
143            return false;
144        }
145    }
146    
147    /**
148     * Performs the necessary processing to shut down shop gracefully
149     */
150    public void shutdown(){
151        barrier.shutdownRequested();
152        persist();
153    }
154    
155    /**
156     * Performs the necessary processing to persist state on a regular basis
157     */
158    @Override
159    public void runThread()
160    {
161        while ( !barrier.await() ){
162            Long currentChangeNumber=changeNumber.get();
163            if ( ! lastPersistedChangeNumber.equals(currentChangeNumber) && persist() )
164            {
165                lastPersistedChangeNumber = currentChangeNumber;
166            }
167        }
168    }
169}