001package com.pingidentity.ds.plugin; 002 003import com.unboundid.directory.sdk.common.api.ServerThread; 004import com.unboundid.directory.sdk.common.types.LogSeverity; 005import com.unboundid.directory.sdk.common.types.ServerContext; 006import com.unboundid.ldap.sdk.*; 007import com.unboundid.util.FixedRateBarrier; 008 009import java.util.concurrent.atomic.AtomicLong; 010 011/** 012 * This class provides facilities to handle the Snapshot Changelog change number in a consistent and resilient manner 013 */ 014public class SnapshotChangeNumberPersister implements ServerThread 015{ 016 private final ServerContext serverContext; 017 private final SnapshotChangelog snapshotChanglog; 018 private AtomicLong changeNumber; 019 private static final String STATE_CLASS = "state"; 020 private static final Long INITIAL_CHANGE_NUMBER = 0L; 021 private DN baseDN; 022 private FixedRateBarrier barrier = new FixedRateBarrier(1000L,3); 023 private static final String ATTR_STATE_CHANGE_NUMBER = "scl-last-change-number"; 024 Long lastPersistedChangeNumber = 0L; 025 026 /** 027 * Performs the necessary processing to generate the DN of the entry containing the state of the enhanced changelog 028 * 029 * @return a String with the DN of the changelog state entry 030 */ 031 private String getStateEntryDN() 032 { 033 return "uid=state," + baseDN; 034 } 035 036 /** 037 * This constructor either loads the current changelog number value or create a new state entry 038 * @param snapshotChangelog the snapshotchangelog object 039 * @throws LDAPException if an error is encountered 040 */ 041 public SnapshotChangeNumberPersister(SnapshotChangelog snapshotChangelog) throws LDAPException 042 { 043 this.snapshotChanglog = snapshotChangelog; 044 this.baseDN=snapshotChangelog.baseDN; 045 this.serverContext = snapshotChangelog.serverContext; 046 SearchResultEntry entry = null; 047 048 Long persistedChangeNumber = INITIAL_CHANGE_NUMBER; 049 try 050 { 051 entry = serverContext.getInternalRootConnection().getEntry(baseDN.toNormalizedString()); 052 } catch (LDAPException ldape) 053 { 054 serverContext.debugCaught(ldape); 055 } 056 if (entry == null) 057 { 058 Entry rootEntry = new Entry(baseDN); 059 rootEntry.addAttribute(SnapshotChangelog.OBJECT_CLASS, "javaContainer", "extensibleObject"); 060 for (Attribute attribute : baseDN.getRDN().getAttributes()) 061 { 062 rootEntry.addAttribute(attribute); 063 } 064 serverContext.getInternalRootConnection().add(rootEntry); 065 } 066 067 entry = serverContext.getInternalRootConnection().getEntry(getStateEntryDN()); 068 if (entry != null) 069 { 070 Long candidateCn = entry.getAttributeValueAsLong(ATTR_STATE_CHANGE_NUMBER); 071 if (candidateCn != null && candidateCn > INITIAL_CHANGE_NUMBER) 072 { 073 persistedChangeNumber = candidateCn; 074 } 075 } else 076 { 077 Entry stateEntry = new Entry(getStateEntryDN()); 078 stateEntry.addAttribute(SnapshotChangelog.OBJECT_CLASS, SnapshotChangelog.CHANGELOG_PREFIX + STATE_CLASS); 079 stateEntry.addAttribute(ATTR_STATE_CHANGE_NUMBER, String.valueOf(persistedChangeNumber)); 080 serverContext.getInternalRootConnection().add(stateEntry); 081 } 082 this.changeNumber = new AtomicLong(persistedChangeNumber); 083 Long fallForwardNumber = changeNumber.get(); 084 085 // Fall-forward: guard against the last persisted change-number to be "behind" 086 Entry fallForwardEntry = null; 087 do 088 { 089 try 090 { 091 fallForwardEntry = serverContext.getInternalRootConnection().getEntry(snapshotChangelog.getChangelogEntryDN(++fallForwardNumber)); 092 } catch (LDAPException e) 093 { 094 fallForwardEntry=null; 095 if (ResultCode.NO_SUCH_OBJECT.equals(e.getResultCode())) 096 { 097 // the last entry was found 098 if ( changeNumber.get() != fallForwardNumber) 099 { 100 changeNumber = new AtomicLong(fallForwardNumber); 101 } 102 } 103 } 104 } while( fallForwardEntry != null ); 105 } 106 107 /** 108 * Performs the necessary processing to return the current change number 109 * @return the current change number 110 */ 111 public Long getCurrentChangeNumber() 112 { 113 return changeNumber.get(); 114 } 115 116 /** 117 * Performs the necessary processing to return sequential change numbers 118 * @return the next change number 119 */ 120 public synchronized Long getNextChangeNumber(){ 121 return changeNumber.incrementAndGet(); 122 } 123 124 /** 125 * Performs the necessary processing to persist the change number to the configured location 126 */ 127 private boolean persist() 128 { 129 Modification m = new Modification(ModificationType.REPLACE, ATTR_STATE_CHANGE_NUMBER, String.valueOf 130 (changeNumber.get())); 131 try 132 { 133 LDAPResult ldapResult = serverContext.getInternalRootConnection().modify(getStateEntryDN(), m); 134 return ResultCode.SUCCESS.equals(ldapResult.getResultCode()); 135 } catch (LDAPException e) 136 { 137 StringBuffer message = new StringBuffer(); 138 message.append("Changelog state could not be saved for change number "); 139 message.append(changeNumber.get()); 140 message.append(" - "); 141 message.append(e.getDiagnosticMessage()); 142 serverContext.logMessage(LogSeverity.FATAL_ERROR, message.toString()); 143 return false; 144 } 145 } 146 147 /** 148 * Performs the necessary processing to shut down shop gracefully 149 */ 150 public void shutdown(){ 151 barrier.shutdownRequested(); 152 persist(); 153 } 154 155 /** 156 * Performs the necessary processing to persist state on a regular basis 157 */ 158 @Override 159 public void runThread() 160 { 161 while ( !barrier.await() ){ 162 Long currentChangeNumber=changeNumber.get(); 163 if ( ! lastPersistedChangeNumber.equals(currentChangeNumber) && persist() ) 164 { 165 lastPersistedChangeNumber = currentChangeNumber; 166 } 167 } 168 } 169}