001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2011-2018 Ping Identity Corporation 026 */ 027package com.pingidentity.sync; 028 029import com.pingidentity.util.MSGraphAPI; 030import com.pingidentity.util.MSGraphResult; 031import com.unboundid.directory.sdk.sync.api.SyncSource; 032import com.unboundid.directory.sdk.sync.config.SyncSourceConfig; 033import com.unboundid.directory.sdk.sync.types.*; 034import com.unboundid.ldap.sdk.ChangeType; 035import com.unboundid.ldap.sdk.DN; 036import com.unboundid.ldap.sdk.Entry; 037import com.unboundid.ldap.sdk.LDAPException; 038import com.unboundid.util.args.ArgumentException; 039import com.unboundid.util.args.ArgumentParser; 040import com.unboundid.util.args.StringArgument; 041import com.unboundid.util.json.JSONObject; 042 043import java.io.Serializable; 044import java.io.UnsupportedEncodingException; 045import java.net.URLEncoder; 046import java.util.Iterator; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.concurrent.BlockingQueue; 050import java.util.concurrent.atomic.AtomicLong; 051 052 053/** 054 * This class provides a simple example of a generic Sync Source which will 055 * detect CSV files in a given directory and synchronize them to an 056 * LDAP destination. More specifically, the extension will look for any files 057 * with the ".csv" extension, and interpret each row to be a single LDAP entry 058 * to be synchronized. After a file has been completely processed, it will be 059 * moved to a separate directory. 060 * <p> 061 * The CSV header should contain valid LDAP attribute names. The 'uid' attribute 062 * is used as the RDN and is required to be present, and the source base DN 063 * is configurable using an extension argument. The DNs for the entries will be 064 * constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is 065 * expected to have values for all of the LDAP attributes, not just those that 066 * have changed. 067 * <p> 068 * This extension creates ChangeRecords using {@link ChangeType#MODIFY}, so when 069 * using this example, it's important to set <i>modifies-as-creates=true</i> 070 * on the Sync Class in the configuration. This will allow the Sync Pipe to 071 * create entries that don't already exist. For the sake of simplicity, this 072 * example does not handle DELETE operations. 073 * <p> 074 * This extension uses the OpenCSV library, which is available at 075 * <a href="http://opencsv.sourceforge.net">http://opencsv.sourceforge.net</a> 076 * and provides simple CSV parsing. 077 * <p> 078 * The following arguments are defined: 079 * <UL> 080 * <LI>csv-files-dir -- The filesystem directory to monitor for CSV files</LI> 081 * <LI>processed-files-dir -- The filesystem directory where finished CSV 082 * files should be moved</LI> 083 * <LI>base-dn -- The base DN to use when creating LDAP entries from the CSV 084 * content</LI> 085 * </UL> 086 */ 087public final class AzureADSyncSource extends SyncSource { 088 //The server context which can be used for obtaining the server state, 089 //logging, etc. 090 private SyncServerContext serverContext; 091 092 093 private static final String ARG_NAME_TENANT_ID = "tenant-id"; 094 private static final String ARG_NAME_CLIENT_ID = "client-id"; 095 private static final String ARG_NAME_CLIENT_SECRET = "client-secret"; 096 private static final String ARG_NAME_ATTRIBUTES = "attributes"; 097 private static final String ARG_NAME_FILTER = "filter"; 098 099 private MSGraphAPI msApi; 100 101 /** 102 * The set of attribute names to pull from the AzureAD users call 103 * 104 * Example: {"objectId", "userPrincipalName", "displayName", "givenName", "surname", "mail"} 105 */ 106 private String attributeNames = null; 107 108 /** 109 * The filter used to select users from the AzureAD users call 110 */ 111 private String filter = null; 112 113 /** 114 * Track a BaseDN that will be used to create the internal sync entry DN 115 * This can be changed with an argument to the plugin 116 */ 117 private DN baseDN = null; 118 119 120 /** 121 * Retrieves a human-readable name for this extension. 122 * 123 * @return A human-readable name for this extension. 124 */ 125 @Override 126 public String getExtensionName() { 127 return "AzureAD Windows Graph API Sync Source"; 128 } 129 130 131 /** 132 * Retrieves a human-readable description for this extension. Each element 133 * of the array that is returned will be considered a separate paragraph in 134 * generated documentation. 135 * 136 * @return A human-readable description for this extension, or {@code null} 137 * or an empty array if no description should be available. 138 */ 139 @Override 140 public String[] getExtensionDescription() { 141 return new String[]{ 142 "This extension implements a Sync Source which can be used " + 143 "to resync users from an AzureAD service." 144 }; 145 } 146 147 148 /** 149 * Updates the provided argument parser to define any configuration arguments 150 * which may be used by this extension. The argument parser may also be 151 * updated to define relationships between arguments (e.g. to specify 152 * required, exclusive, or dependent argument sets). 153 * 154 * @param parser The argument parser to be updated with the configuration 155 * arguments which may be used by this extension. 156 * @throws ArgumentException If a problem is encountered while updating the 157 * provided argument parser. 158 */ 159 @Override 160 public void defineConfigArguments(final ArgumentParser parser) 161 throws ArgumentException { 162 163 StringArgument newStringArg; 164 165 newStringArg = new StringArgument( 166 null, ARG_NAME_TENANT_ID, true, 1, "{tenantId}", 167 "The tenantId used to call the AzureAD Graph API"); 168 newStringArg.setSensitive(true); 169 parser.addArgument(newStringArg); 170 171 newStringArg = new StringArgument( 172 null, ARG_NAME_CLIENT_ID, true, 1, "{clientId}", 173 "The clientId used to call the AzureAD Graph API"); 174 newStringArg.setSensitive(true); 175 parser.addArgument(newStringArg); 176 177 newStringArg = new StringArgument( 178 null, ARG_NAME_CLIENT_SECRET, true, 1, "{clientSecret}", 179 "The clientSecret used in conjunction with the clientId to call the AzureAD Graph API"); 180 newStringArg.setSensitive(true); 181 parser.addArgument(newStringArg); 182 183 newStringArg = new StringArgument( 184 null, ARG_NAME_ATTRIBUTES, false, 1, "{attributes}", 185 "The attributes returned from the AzureAD Graph API." + 186 " Comma separated list of attribute names (i.e. first,last,address"); 187 parser.addArgument(newStringArg); 188 189 newStringArg = new StringArgument( 190 null, ARG_NAME_FILTER, false, 1, "{filter}", 191 "The filter used with the AzureAD Graph API."); 192 parser.addArgument(newStringArg); 193 } 194 195 196 /** 197 * This hook is called when a Sync Pipe first starts up, when the 198 * <i>resync</i> process first starts up, or when the set-startpoint 199 * subcommand is called from the <i>realtime-sync</i> command line tool. 200 * Any initialization of this sync source should be performed here. This 201 * method should generally store the {@link SyncServerContext} in a class 202 * member so that it can be used elsewhere in the implementation. 203 * 204 * @param serverContext A handle to the server context for the server in 205 * which this extension is running. 206 * @param config The general configuration for this sync source. 207 * @param parser The argument parser which has been initialized from 208 * the configuration for this sync source. 209 */ 210 @Override 211 public void initializeSyncSource(final SyncServerContext serverContext, 212 final SyncSourceConfig config, 213 final ArgumentParser parser) { 214 this.serverContext = serverContext; 215 216 StringArgument newStringArg; 217 218 newStringArg = 219 (StringArgument) parser.getNamedArgument(ARG_NAME_TENANT_ID); 220 String tenantId = newStringArg.getValue(); 221 222 newStringArg = 223 (StringArgument) parser.getNamedArgument(ARG_NAME_CLIENT_ID); 224 String clientId = newStringArg.getValue(); 225 226 newStringArg = (StringArgument) parser.getNamedArgument(ARG_NAME_CLIENT_SECRET); 227 String clientSecret = newStringArg.getValue(); 228 229 newStringArg = (StringArgument) parser.getNamedArgument(ARG_NAME_ATTRIBUTES); 230 attributeNames = newStringArg.getValue(); 231 232 newStringArg = (StringArgument) parser.getNamedArgument(ARG_NAME_FILTER); 233 filter = newStringArg.getValue(); 234 235 msApi = new MSGraphAPI(tenantId, clientId, clientSecret); 236 } 237 238 239 /** 240 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 241 * process shuts down, or when the set-startpoint subcommand (from the 242 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 243 * sync source should be performed here. 244 */ 245 @Override 246 public void finalizeSyncSource() { 247 //No implementation necessary. 248 } 249 250 251 /** 252 * This method should effectively set the starting point for synchronization 253 * to the place specified by the <code>options</code> parameter. This should 254 * cause all changes previous to the specified start point to be disregarded 255 * and only changes after that point to be returned by 256 * {@link #getNextBatchOfChanges(int, AtomicLong)}. 257 * <p> 258 * There are several different startpoint types (see 259 * {@link SetStartpointOptions}), and this implementation is not required to 260 * support them all. If the specified startpoint type is unsupported, this 261 * method should throw an {@link UnsupportedOperationException}. 262 * <p> 263 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 264 * must be supported by your implementation, because this is used when a Sync 265 * Pipe first starts up. The {@link Serializable} in this case is the same 266 * type that is returned by {@link #getStartpoint()}; the 267 * Data Sync Server persists it and passes it back in on a restart. 268 * <p> 269 * This method can be called from two different contexts: 270 * <ul> 271 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 272 * (the Sync Pipe is required to be stopped in this context)</li> 273 * <li>Immediately after a Sync Pipe starts up and a connection is first 274 * established to the source server (e.g. before the first call to 275 * {@link #getNextBatchOfChanges(int, AtomicLong)})</li> 276 * </ul> 277 * 278 * @param options an object which indicates where exactly to start synchronizing 279 * (e.g. the end of the changelog, specific change number, a certain 280 * time ago, etc) 281 * @throws EndpointException if there is any error while setting the start point 282 */ 283 @Override 284 @SuppressWarnings("unchecked") 285 public void setStartpoint(final SetStartpointOptions options) 286 throws EndpointException { 287 288 throw new IllegalArgumentException("This source doesn't support realtime sync"); 289 } 290 291 292 /** 293 * Gets the current value of the startpoint for change detection. This is the 294 * "bookmark" which indicates which changes have already been processed and 295 * which have not. In most cases, a change number is used to detect changes 296 * and is managed by the Data Sync Server, in which case this 297 * implementation needs only to return the latest acknowledged 298 * change number. In other cases, the return value may correspond to a 299 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 300 * In any case, this method should return the value that is updated by 301 * {@link #acknowledgeCompletedOps(LinkedList)}. 302 * <p> 303 * This method is called periodically and the return value is saved in the 304 * persistent state for the Sync Pipe that uses this extension as its Sync 305 * Source. 306 * <p> 307 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 308 * updated after a sync operation is acknowledged back to this script (via 309 * {@link #acknowledgeCompletedOps(LinkedList)}). 310 * Otherwise it will be possible for changes to be missed when the 311 * Data Sync Server is restarted or a connection error occurs. 312 * 313 * @return a value to store in the persistent state for the Sync Pipe. This is 314 * usually a change number, but if a changelog table is not used to 315 * detect changes, this value should represent some other token to 316 * pass into {@link #setStartpoint(SetStartpointOptions)} 317 * when the sync pipe starts up. 318 */ 319 @Override 320 public Serializable getStartpoint() { 321 //We return null here since realtime sync isn't supported for this source 322 return null; 323 } 324 325 326 /** 327 * Return the URL or path identifying the source endpoint 328 * from which this extension is transmitting data. This is used for logging 329 * purposes only, so it could just be a server name or hostname and port, etc. 330 * 331 * @return the path to the source endpoint 332 */ 333 @Override 334 public String getCurrentEndpointURL() { 335 return "Not Set - AzureAD URL"; 336 } 337 338 339 /** 340 * Return a full source entry (in LDAP form) from the source, corresponding 341 * to the {@link ChangeRecord} that is passed in through the 342 * {@link SyncOperation}. This method should perform any queries necessary to 343 * gather the latest values for all the attributes to be synchronized. 344 * <p> 345 * This method <b>must be thread safe</b>, as it will be called repeatedly and 346 * concurrently by each of the Sync Pipe worker threads as they process 347 * entries. 348 * <p> 349 * If the original ChangeRecord has the full entry already set on it 350 * (which can be done using the 351 * {@link 352 * com.unboundid.directory.sdk.sync.types.ChangeRecord.Builder#fullEntry(Entry)}), 353 * then this method will not get called, and the Data Sync Server 354 * will automatically use the full entry from the ChangeRecord. In this case, 355 * the implementation can always return {@code null}. 356 * 357 * @param operation the SyncOperation which identifies the source "entry" to 358 * fetch. The ChangeRecord can be obtained by calling 359 * <code>operation.getChangeRecord()</code>. 360 * These ChangeRecords are generated by 361 * {@link #getNextBatchOfChanges(int, AtomicLong)} 362 * or by 363 * {@link #listAllEntries(BlockingQueue)}. 364 * @return a full LDAP Entry, or null if no such entry exists. 365 * @throws EndpointException if there is an error fetching the entry 366 */ 367 @Override 368 public Entry fetchEntry(final SyncOperation operation) 369 throws EndpointException { 370 ChangeRecord record = operation.getChangeRecord(); 371 throw new IllegalStateException( 372 "fetchEntry() should not be called because the full entry is set " + 373 "on the ChangeRecord: " + record.toString()); 374 } 375 376 377 /** 378 * Provides a way for the Data Sync Server to acknowledge back to the 379 * script which sync operations it has processed. This method should update 380 * the official startpoint which was set by 381 * {@link #setStartpoint(SetStartpointOptions)} and is 382 * returned by {@link #getStartpoint()}. 383 * <p> 384 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 385 * updated after a sync operation is acknowledged back to this extension (via 386 * this method). Otherwise it will be possible for changes to be missed when 387 * the Data Sync Server is restarted or a connection error occurs. 388 * 389 * @param completedOps a list of {@link SyncOperation}s that have finished processing. 390 * The records are listed in the order they were first detected. 391 * @throws EndpointException if there is an error acknowledging the changes back to the 392 * database 393 */ 394 @Override 395 public void acknowledgeCompletedOps( 396 final LinkedList<SyncOperation> completedOps) 397 throws EndpointException { 398 throw new IllegalArgumentException("This source doesn't support realtime sync"); 399 } 400 401 402 /** 403 * Return the next batch of change records from the source. Change records 404 * are usually just hints that a change happened; they do not include 405 * the full contents of the target entry. In an effort to never synchronize 406 * stale data, the Data Sync Server will go back and fetch the full 407 * target entry for each change record. 408 * 409 * @param maxChanges the maximum number of changes to retrieve 410 * @param numStillPending this should be set to the number of unretrieved changes that 411 * are still pending after this batch has been retrieved. This will 412 * be passed in 413 * as zero, and may be left that way if the actual value cannot be 414 * determined. 415 * @return a list of {@link ChangeRecord} instances, each 416 * corresponding to a single change at the source endpoint. 417 * If there are no new changes to return, this method should return 418 * an empty list. 419 * @throws EndpointException if there is any error while retrieving the next batch of changes 420 */ 421 @Override 422 public List<ChangeRecord> getNextBatchOfChanges( 423 final int maxChanges, 424 final AtomicLong numStillPending) 425 throws EndpointException { 426 throw new IllegalArgumentException("This source doesn't support realtime sync"); 427 } 428 429 430 /** 431 * Gets a list of all the entries in the source endpoint. This is used by the 432 * 'resync' command line tool. 433 * 434 * @param outputQueue a queue of ChangeRecord objects which will be individually 435 * fetched via {@link #fetchEntry(SyncOperation)} 436 * @throws EndpointException if there is an error retrieving the list of entries to resync 437 */ 438 @Override 439 public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue) 440 throws EndpointException { 441 serverContext.debugInfo("Beginning to dump all entries..."); 442 443 MSGraphResult graphResult = msApi.getEntries(MSGraphAPI.OBJECT_TYPE.USERS, 444 filter, 445 attributeNames); 446 447 while (graphResult.size() > 0) { 448 List<Entry> entries = graphResult.entries; 449 450 if (entries == null || entries.size() == 0) { 451 break; 452 } 453 454 for (Entry e : entries) { 455 /** 456 * Now that we have an entry, create a builder and place on the builder queue 457 */ 458 try { 459 /** 460 * Let's create a ChangeRecord 461 */ 462 ChangeRecord.Builder bldr = new ChangeRecord.Builder( 463 ChangeType.MODIFY, e.getParsedDN()); 464 465 bldr.changeTime(System.currentTimeMillis()); 466 467 /** 468 * Add the full entry to the ChangeRecord. This will cause the 469 * Sync engine to skip the call to fetchEntry() and instead use this 470 * entry. 471 */ 472 bldr.fullEntry(e); 473 474 /** 475 * Put the ChangeRecord onto the outputQueue, to be processed by the sync 476 * server core 477 */ 478 outputQueue.put(bldr.build()); 479 } catch (LDAPException ex) { 480 serverContext.debugError("Problem creating the BuildRecord: " + ex.getMessage()); 481 } catch (InterruptedException ix) { 482 serverContext.debugError( "Blocking queue iterrupted: " + ix.getMessage()); 483 } 484 } 485 486 /** 487 * Let's get the nextLink data field. This will provide the URL to use to 488 * retrieve the next page of data. 489 */ 490 graphResult = msApi.nextPage(graphResult); 491 } 492 493 494 } 495 496 /** 497 * Gets a list of all the entries in the source from a given file input. 498 * This is used by the 'resync' command line tool. 499 * 500 * @param inputLines an Iterator containing the lines from the specified input file to 501 * resync (this is specified on the CLI for the resync command). 502 * These lines can be any format, for example a set of primary keys, 503 * a set of WHERE clauses, a set of full SQL queries, etc. 504 * @param outputQueue a queue of ChangeRecord objects which will be individually 505 * fetched via {@link #fetchEntry(SyncOperation)} 506 * @throws EndpointException if there is an error retrieving the list of entries to resync 507 */ 508 @Override 509 public void listAllEntries(final Iterator<String> inputLines, 510 final BlockingQueue<ChangeRecord> outputQueue) 511 throws EndpointException { 512 serverContext.debugInfo("Beginning to dump entries from file..."); 513 514 } 515 516 /** 517 * Method to encode a string value using the 'UTF-8 encoding scheme. 518 * if the value is null, then we will turn that into an empty string "" 519 * 520 * @param value 521 * @return 522 */ 523 // Method to encode a string value using `UTF-8` encoding scheme 524 private static String encodeValue(String value) { 525 if (value == null) { 526 return ""; 527 } 528 529 try { 530 return URLEncoder.encode(value, "UTF-8"); 531 } catch (UnsupportedEncodingException ex) { 532 throw new RuntimeException(ex.getCause()); 533 } 534 } 535 536 /** 537 * Helpef function to add an attribute from a JSONObject into an LDAP Entry. 538 * 539 * @param entry 540 * @param jsonObj 541 * @param attrName 542 */ 543 private void addAttrToEntry (final Entry entry, final JSONObject jsonObj, final String attrName) { 544 545 if (jsonObj == null || attrName == null) return; 546 547 String attrVal = jsonObj.getFieldAsString(attrName); 548 549 if (attrVal != null && ! "null".equals(attrVal)) { 550 entry.addAttribute(attrName, attrVal); 551 } 552 } 553}