public final class CSV
extends com.unboundid.directory.sdk.sync.api.SyncSource
implements com.unboundid.directory.sdk.common.internal.Reconfigurable<com.unboundid.directory.sdk.sync.config.SyncSourceConfig>
The CSV header should contain valid LDAP attribute names. The 'uid' attribute is used as the RDN and is required to be present, and the source base DN is configurable using an extension argument. The DNs for the entries will be constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is expected to have values for all of the LDAP attributes, not just those that have changed.
This extension creates ChangeRecords using ChangeType.MODIFY
, so when
using this example, it's important to set modifies-as-creates=true on
the Sync Class in the configuration. This will allow the Sync Pipe to create
entries that don't already exist. For the sake of simplicity, this example
does not handle DELETE operations.
This extension uses the OpenCSV library, which is available at
The following arguments are defined:
Constructor and Description |
---|
CSV() |
Modifier and Type | Method and Description |
---|---|
void |
acknowledgeCompletedOps(LinkedList<com.unboundid.directory.sdk.sync.types.SyncOperation> completedOps)
Provides a way for the Identity Data Sync Server to acknowledge back to the
script which sync operations it has processed.
|
com.unboundid.ldap.sdk.ResultCode |
applyConfiguration(com.unboundid.directory.sdk.sync.config.SyncSourceConfig syncSourceConfig,
com.unboundid.util.args.ArgumentParser parser,
List<String> adminActionsRequired,
List<String> messages)
Applies configuration
|
void |
defineConfigArguments(com.unboundid.util.args.ArgumentParser parser)
Updates the provided argument parser to define any configuration arguments
which may be used by this extension.
|
com.unboundid.ldap.sdk.Entry |
fetchEntry(com.unboundid.directory.sdk.sync.types.SyncOperation operation)
Return a full source entry (in LDAP form) from the source, corresponding to
the
ChangeRecord that is passed in through the
SyncOperation . |
void |
finalizeSyncSource()
This hook is called when a Sync Pipe shuts down, when the resync
process shuts down, or when the set-startpoint subcommand (from the
realtime-sync command line tool) is finished.
|
String |
getCurrentEndpointURL()
Return the URL or path identifying the source endpoint from which this
extension is transmitting data.
|
String[] |
getExtensionDescription()
Retrieves a human-readable description for this extension.
|
String |
getExtensionName()
Retrieves a human-readable name for this extension.
|
List<com.unboundid.directory.sdk.sync.types.ChangeRecord> |
getNextBatchOfChanges(int maxChanges,
AtomicLong numStillPending)
Return the next batch of change records from the source.
|
Serializable |
getStartpoint()
Gets the current value of the startpoint for change detection.
|
void |
initializeSyncSource(com.unboundid.directory.sdk.sync.types.SyncServerContext serverContext,
com.unboundid.directory.sdk.sync.config.SyncSourceConfig config,
com.unboundid.util.args.ArgumentParser parser)
This hook is called when a Sync Pipe first starts up, when the
resync process first starts up, or when the set-startpoint
subcommand is called from the realtime-sync command line tool.
|
boolean |
isConfigurationAcceptable(com.unboundid.directory.sdk.sync.config.SyncSourceConfig syncSourceConfig,
com.unboundid.util.args.ArgumentParser argumentParser,
List<String> unacceptableReasons)
Makes all verifications to ensure that the configuration for the extension is acceptable
|
void |
listAllEntries(BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue)
Gets a list of all the entries in the source endpoint.
|
void |
listAllEntries(Iterator<String> inputLines,
BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue)
Gets a list of all the entries in the source from a given file input.
|
void |
setStartpoint(com.unboundid.directory.sdk.sync.types.SetStartpointOptions options)
This method should effectively set the starting point for synchronization
to the place specified by the
options parameter. |
public String getExtensionName()
getExtensionName
in interface com.unboundid.directory.sdk.common.internal.UnboundIDExtension
getExtensionName
in class com.unboundid.directory.sdk.sync.api.SyncSource
public String[] getExtensionDescription()
getExtensionDescription
in interface com.unboundid.directory.sdk.common.internal.UnboundIDExtension
getExtensionDescription
in class com.unboundid.directory.sdk.sync.api.SyncSource
null
or
an empty array if no description should be available.public void defineConfigArguments(com.unboundid.util.args.ArgumentParser parser) throws com.unboundid.util.args.ArgumentException
defineConfigArguments
in interface com.unboundid.directory.sdk.common.internal.Configurable
defineConfigArguments
in class com.unboundid.directory.sdk.sync.api.SyncSource
parser
- The argument parser to be updated with the configuration arguments
which may be used by this extension.com.unboundid.util.args.ArgumentException
- If a problem is encountered while updating the provided argument
parser.public boolean isConfigurationAcceptable(com.unboundid.directory.sdk.sync.config.SyncSourceConfig syncSourceConfig, com.unboundid.util.args.ArgumentParser argumentParser, List<String> unacceptableReasons)
isConfigurationAcceptable
in interface com.unboundid.directory.sdk.common.internal.Reconfigurable<com.unboundid.directory.sdk.sync.config.SyncSourceConfig>
syncSourceConfig
- sync source configurationargumentParser
- the argument parserunacceptableReasons
- a list of reasons explaining why the configuration is not acceptablepublic com.unboundid.ldap.sdk.ResultCode applyConfiguration(com.unboundid.directory.sdk.sync.config.SyncSourceConfig syncSourceConfig, com.unboundid.util.args.ArgumentParser parser, List<String> adminActionsRequired, List<String> messages)
applyConfiguration
in interface com.unboundid.directory.sdk.common.internal.Reconfigurable<com.unboundid.directory.sdk.sync.config.SyncSourceConfig>
syncSourceConfig
- sync source configuration objectparser
- the argument parseradminActionsRequired
- a list of administrative actions to takemessages
- a list of error messagespublic void initializeSyncSource(com.unboundid.directory.sdk.sync.types.SyncServerContext serverContext, com.unboundid.directory.sdk.sync.config.SyncSourceConfig config, com.unboundid.util.args.ArgumentParser parser)
SyncServerContext
in a class member so
that it can be used elsewhere in the implementation.initializeSyncSource
in class com.unboundid.directory.sdk.sync.api.SyncSource
serverContext
- A handle to the server context for the server in which this
extension is running.config
- The general configuration for this sync source.parser
- The argument parser which has been initialized from the
configuration for this sync source.public void finalizeSyncSource()
finalizeSyncSource
in class com.unboundid.directory.sdk.sync.api.SyncSource
public void setStartpoint(com.unboundid.directory.sdk.sync.types.SetStartpointOptions options) throws com.unboundid.directory.sdk.sync.types.EndpointException
options
parameter. This should
cause all changes previous to the specified start point to be disregarded
and only changes after that point to be returned by
getNextBatchOfChanges(int, AtomicLong)
.
There are several different startpoint types (see
SetStartpointOptions
), and this implementation is not required to
support them all. If the specified startpoint type is unsupported, this
method should throw an UnsupportedOperationException
.
IMPORTANT: The RESUME_AT_SERIALIZABLE
startpoint type
must be supported by your implementation, because this is used when a Sync
Pipe first starts up. The Serializable
in this case is the same
type that is returned by getStartpoint()
; the Identity Data Sync
Server persists it and passes it back in on on a restart.
This method can be called from two different contexts:
getNextBatchOfChanges(int, AtomicLong)
)setStartpoint
in class com.unboundid.directory.sdk.sync.api.SyncSource
options
- an object which indicates where exactly to start synchronizing
(e.g. the end of the changelog, specific change number, a certain
time ago, etc)com.unboundid.directory.sdk.sync.types.EndpointException
- if there is any error while setting the start pointpublic Serializable getStartpoint()
acknowledgeCompletedOps(LinkedList)
.
This method is called periodically and the return value is saved in the persistent state for the Sync Pipe that uses this extension as its Sync Source.
IMPORTANT: The internal value for the startpoint should only be
updated after a sync operation is acknowledged back to this script (via
acknowledgeCompletedOps(LinkedList)
). Otherwise it will be
possible for changes to be missed when the Identity Data Sync Server is
restarted or a connection error occurs.
getStartpoint
in class com.unboundid.directory.sdk.sync.api.SyncSource
setStartpoint(SetStartpointOptions)
when the
sync pipe starts up.public String getCurrentEndpointURL()
getCurrentEndpointURL
in class com.unboundid.directory.sdk.sync.api.SyncSource
public com.unboundid.ldap.sdk.Entry fetchEntry(com.unboundid.directory.sdk.sync.types.SyncOperation operation) throws com.unboundid.directory.sdk.sync.types.EndpointException
ChangeRecord
that is passed in through the
SyncOperation
. This method should perform any queries necessary to
gather the latest values for all the attributes to be synchronized.
This method must be thread safe, as it will be called repeatedly and concurrently by each of the Sync Pipe worker threads as they process entries.
If the original ChangeRecord has the full entry already set on it (which
can be done using the ChangeRecord.Builder#fullEntry(Entry)
), then
this method will not get called, and the Identity Data Sync Server will
automatically use the full entry from the ChangeRecord. In this case, the
implementation can always return null
.
fetchEntry
in class com.unboundid.directory.sdk.sync.api.SyncSource
operation
- the SyncOperation which identifies the source "entry" to fetch.
The ChangeRecord can be obtained by calling
operation.getChangeRecord()
. These ChangeRecords are
generated by getNextBatchOfChanges(int, AtomicLong)
or by
listAllEntries(BlockingQueue)
.com.unboundid.directory.sdk.sync.types.EndpointException
- if there is an error fetching the entrypublic void acknowledgeCompletedOps(LinkedList<com.unboundid.directory.sdk.sync.types.SyncOperation> completedOps) throws com.unboundid.directory.sdk.sync.types.EndpointException
setStartpoint(SetStartpointOptions)
and is returned by
getStartpoint()
.
IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this extension (via this method). Otherwise it will be possible for changes to be missed when the Identity Data Sync Server is restarted or a connection error occurs.
acknowledgeCompletedOps
in class com.unboundid.directory.sdk.sync.api.SyncSource
completedOps
- a list of SyncOperation
s that have finished processing.
The records are listed in the order they were first detected.com.unboundid.directory.sdk.sync.types.EndpointException
- if there is an error acknowledging the changes back to the
databasepublic List<com.unboundid.directory.sdk.sync.types.ChangeRecord> getNextBatchOfChanges(int maxChanges, AtomicLong numStillPending) throws com.unboundid.directory.sdk.sync.types.EndpointException
On the first invocation, this should return changes starting from the
startpoint that was set by setStartpoint(SetStartpointOptions)
.
This method is also responsible for updating the internal state such that
subsequent invocations do not return duplicate changes.
The resulting list should be limited by maxChanges
. The
numStillPending
reference should be set to the estimated
number of changes that haven't yet been retrieved from the source endpoint
when this method returns, or zero if all the current changes have been
retrieved.
IMPORTANT: While this method needs to keep track of which changes
have already been returned so that it does not return them again, it should
NOT modify the official startpoint. The internal value for the
startpoint should only be updated after a sync operation is acknowledged
back to this script (via acknowledgeCompletedOps(LinkedList)
).
Otherwise it will be possible for changes to be missed when the Identity
Data Sync Server is restarted or a connection error occurs. The startpoint
should not change as a result of this method.
This method does not need to be thread-safe. It will be invoked repeatedly by a single thread, based on the polling interval set in the Sync Pipe configuration.
getNextBatchOfChanges
in class com.unboundid.directory.sdk.sync.api.SyncSource
maxChanges
- the maximum number of changes to retrievenumStillPending
- this should be set to the number of unretrieved changes that are
still pending after this batch has been retrieved. This will be
passed in as zero, and may be left that way if the actual value
cannot be determined.ChangeRecord
instances, each corresponding to a
single change at the source endpoint. If there are no new changes
to return, this method should return an empty list.com.unboundid.directory.sdk.sync.types.EndpointException
- if there is any error while retrieving the next batch of changespublic void listAllEntries(BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue) throws com.unboundid.directory.sdk.sync.types.EndpointException
UnsupportedOperationException
; subclasses should override if the
resync functionality is needed.
The outputQueue
should contain ChangeRecord
objects
with the ChangeType
set to null
to indicate that
these are resync operations.
This method should not return until all the entries at the source have been
added to the output queue. Separate threads will concurrently drain entries
from the queue and process them. The queue typically should not contain
full entries, but rather ChangeRecord objects which identify the full
source entries. These objects are then individually passed in to
fetchEntry(SyncOperation)
. Therefore, it is important to make sure
that the ChangeRecord instances contain enough identifiable information
(e.g. primary keys) for each entry so that the entry can be found again.
The lifecycle of resync is similar to that of real-time sync, with a few differences:
Alternatively, the full entry can be set on the ChangeRecord within this method, which will cause the "fetch full entry" step to be skipped. In this case the Identity Data Sync Server will just use the entry specified on the ChangeRecord.
If the total set of entries is very large, it is fine to split up the work into multiple network queries within this method. The queue will not grow out of control because it blocks when it becomes full. The queue capacity is fixed at 1000.
listAllEntries
in class com.unboundid.directory.sdk.sync.api.SyncSource
outputQueue
- a queue of ChangeRecord objects which will be individually fetched
via fetchEntry(SyncOperation)
com.unboundid.directory.sdk.sync.types.EndpointException
- if there is an error retrieving the list of entries to resyncpublic void listAllEntries(Iterator<String> inputLines, BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue) throws com.unboundid.directory.sdk.sync.types.EndpointException
UnsupportedOperationException
; subclasses should override
if the resync functionality is needed for specific records, which can be
specified in the input file.
The format for the inputLines
(e.g. the content of the file)
is user-defined; it may be key/value pairs, primary keys, or full SQL
statements, for example. The use of this method is triggered via the
--sourceInputFile argument on the resync CLI. The
outputQueue
should contain ChangeRecord
objects with
the ChangeType
set to null
to indicate that these
are resync operations.
This method should not return until all the entries specified by the input
file have been added to the output queue. Separate threads will
concurrently drain entries from the queue and process them. The queue
typically should not contain full entries, but rather ChangeRecord objects
which identify the full source entries. These objects are then individually
passed in to fetchEntry(SyncOperation)
. Therefore, it is important
to make sure that the ChangeRecord instances contain enough identifiable
information (e.g. primary keys) for each entry so that the entry can be
found again.
The lifecycle of resync is similar to that of real-time sync, with a few differences:
Alternatively, the full entry can be set on the ChangeRecord within this method, which will cause the "fetch full entry" step to be skipped. In this case the Identity Data Sync Server will just use the entry specified on the ChangeRecord.
If the total set of entries is very large, it is fine to split up the work into multiple network queries within this method. The queue will not grow out of control because it blocks when it becomes full. The queue capacity is fixed at 1000.
listAllEntries
in class com.unboundid.directory.sdk.sync.api.SyncSource
inputLines
- an Iterator containing the lines from the specified input file to
resync (this is specified on the CLI for the resync command).
These lines can be any format, for example a set of primary keys,
a set of WHERE clauses, a set of full SQL queries, etc.outputQueue
- a queue of ChangeRecord objects which will be individually fetched
via fetchEntry(SyncOperation)
com.unboundid.directory.sdk.sync.types.EndpointException
- if there is an error retrieving the list of entries to resyncCopyright © 2019 Ping Identity. All rights reserved.