Skip to content

Commit

Permalink
TLSConfiguration: separate client and broker TLS configuration (#111)
Browse files Browse the repository at this point in the history
* Refactor `KafkaConfiguration.TLSConfiguration`

Motivation:

Previously, `TLSConfiguration` forced users into providing a key store /
key pair for client verification and did not expose a canonical way to
disable client verification.

Modifications:

* `KafkaConfiguration.TLSConfiguration`
    * separate `Client` and `Broker` `TLS` configuration
    * provide default `TLSConfiguration`: `.disableClientVerification`,
      and verify broker identity
* `KafkaConfiguration.SecurityProtocol`:
    * provide `TLSConfiguration()` as default parameter value for
      `TLSConfiguration:`
* update `TLSConfiguration` examples in `README`

* Rename Security configuration types

* Review David

Co-authored-by: David Nadoba <[email protected]>

* Review Franz

Modifications:

* consistent naming: `disabled` instead of `disable`
* rename `rootCert` -> `trustRoots`
* rename `caCertificates` -> `trustRoots`
* rename `crlLocation` -> `certificateRevocationListPath`

Co-authored-by: Franz Busch <[email protected]>

* Review David

Modifications:

* add `certificateRevocationListPath` parameter default value `nil`
* `KafkaConfiguration.TLSConfiguration.brokerVerification`: more readable default value documentation

* README: Update security examples

---------

Co-authored-by: David Nadoba <[email protected]>
Co-authored-by: Franz Busch <[email protected]>
  • Loading branch information
3 people authored Aug 14, 2023
1 parent 950d1f8 commit a8eb315
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 125 deletions.
35 changes: 2 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,8 @@ configuration.securityProtocol = .plaintext
#### TLS

```swift
let leafCert = KafkaConfiguration.TLSConfiguration.LeafAndIntermediates.pem("YOUR_LEAF_CERTIFICATE")
let rootCert = KafkaConfiguration.TLSConfiguration.Root.pem("YOUR_ROOT_CERTIFICATE")

let privateKey = KafkaConfiguration.TLSConfiguration.PrivateKey(
location: .file(location: "KEY_FILE"),
password: ""
)

let tlsConfig = KafkaConfiguration.TLSConfiguration.keyPair(
privateKey: privateKey,
publicKeyCertificate: leafCert,
caCertificate: rootCert,
crlLocation: nil
)

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls(configuration: tlsConfig)
configuration.securityProtocol = .tls()
```

#### SASL
Expand All @@ -239,30 +224,14 @@ config.securityProtocol = .saslPlaintext(
#### SASL + TLS

```swift
let leafCert = KafkaConfiguration.TLSConfiguration.LeafAndIntermediates.pem("YOUR_LEAF_CERTIFICATE")
let rootCert = KafkaConfiguration.TLSConfiguration.Root.pem("YOUR_ROOT_CERTIFICATE")

let privateKey = KafkaConfiguration.TLSConfiguration.PrivateKey(
location: .file(location: "KEY_FILE"),
password: ""
)

let tlsConfig = KafkaConfiguration.TLSConfiguration.keyPair(
privateKey: privateKey,
publicKeyCertificate: leafCert,
caCertificate: rootCert,
crlLocation: nil
)

let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
username: "USERNAME",
password: "PASSWORD"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
saslMechanism: saslMechanism,
tlsConfiguaration: tlsConfig
saslMechanism: saslMechanism
)
```

Expand Down
194 changes: 105 additions & 89 deletions Sources/Kafka/Configuration/KafkaConfiguration+Security.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,28 @@ extension KafkaConfiguration {
}
}

public struct Root: Sendable, Hashable {
internal enum _Root: Sendable, Hashable {
public struct TrustRoots: Sendable, Hashable {
internal enum _TrustRoots: Sendable, Hashable {
case probe
case disableBrokerVerification
case file(location: String)
case pem(String)
}

let _internal: _Root
let _internal: _TrustRoots

/// A list of standard paths will be probed and the first one found will be used as the default root certificate location path.
public static let probe = Root(_internal: .probe)

/// Disable OpenSSL's built-in broker (server) certificate verification.
public static let disableBrokerVerification = Root(_internal: .disableBrokerVerification)
public static let probe = TrustRoots(_internal: .probe)

/// File or directory path to root certificate(s) for verifying the broker's key.
public static func file(location: String) -> Root {
return Root(
public static func file(location: String) -> TrustRoots {
return TrustRoots(
_internal: .file(location: location)
)
}

/// Root certificate String for verifying the broker's key.
public static func pem(_ pem: String) -> Root {
return Root(
/// Trust roots certificate String for verifying the broker's key.
public static func pem(_ pem: String) -> TrustRoots {
return TrustRoots(
_internal: .pem(pem)
)
}
Expand Down Expand Up @@ -122,112 +118,134 @@ extension KafkaConfiguration {
}
}

internal enum _TLSConfiguration: Sendable, Hashable {
case keyPair(
/// Configuration for the TLS identity of the client.
public struct ClientIdentity: Sendable, Hashable {
internal enum _ClientIdentity: Sendable, Hashable {
case keyPair(
privateKey: PrivateKey,
certificates: LeafAndIntermediates
)
case keyStore(keyStore: KeyStore)
}

let _internal: _ClientIdentity

/// Use TLS client verification with a given private/public key pair.
///
/// - Parameters:
/// - privateKey: The client's private key (PEM) used for authentication.
/// - certificate: The client's public key (PEM) used for authentication.
public static func keyPair(
privateKey: PrivateKey,
publicKeyCertificate: LeafAndIntermediates,
caCertificate: Root,
crlLocation: String?
)
case keyStore(
keyStore: KeyStore,
caCertificate: Root,
crlLocation: String?
)
certificates: LeafAndIntermediates
) -> ClientIdentity {
return .init(
_internal: .keyPair(
privateKey: privateKey,
certificates: certificates
)
)
}

/// Use TLS client verification with a given key store.
///
/// - Parameters:
/// - keyStore: The client's keystore (PKCS#12) used for authentication.
public static func keyStore(keyStore: KeyStore) -> ClientIdentity {
return .init(_internal: .keyStore(keyStore: keyStore))
}
}

let _internal: _TLSConfiguration

/// Use TLS with a given private/public key pair.
///
/// - Parameters:
///
/// - privateKey: The client's private key (PEM) used for authentication.
/// - publicKeyCertificate: The client's public key (PEM) used for authentication.
/// - caCertificate: File or directory path to CA certificate(s) for verifying the broker's key.
/// - crLocation: Path to CRL for verifying broker's certificate validity.
public static func keyPair(
privateKey: PrivateKey,
publicKeyCertificate: LeafAndIntermediates,
caCertificate: Root = .probe,
crlLocation: String?
) -> TLSConfiguration {
return TLSConfiguration(
_internal: .keyPair(
privateKey: privateKey,
publicKeyCertificate: publicKeyCertificate,
caCertificate: caCertificate,
crlLocation: crlLocation
/// Configuration for the TLS verification of the broker.
public struct BrokerVerification: Sendable, Hashable {
internal enum _BrokerVerification: Sendable, Hashable {
case disabled
case verify(
trustRoots: TrustRoots,
certificateRevocationListPath: String?
)
)
}
}

let _internal: _BrokerVerification

/// Do not verify the identity of the broker.
public static let disabled: BrokerVerification = .init(_internal: .disabled)

///
/// - Parameters:
///
/// - keyStore: The client's keystore (PKCS#12) used for authentication.
/// - caCertificate: File or directory path to CA certificate(s) for verifying the broker's key.
/// - crlLocation: Path to CRL for verifying broker's certificate validity.
public static func keyStore(
keyStore: KeyStore,
caCertificate: Root = .probe,
crlLocation: String?
) -> TLSConfiguration {
return TLSConfiguration(
_internal: .keyStore(
keyStore: keyStore,
caCertificate: caCertificate,
crlLocation: crlLocation
/// Verify the identity of the broker.
///
/// Parameters:
/// - trustRoots: File or directory path to CA certificate(s) for verifying the broker's key.
/// - certificateRevocationListPath: Path to CRL for verifying broker's certificate validity.
public static func verify(
trustRoots: TrustRoots = .probe,
certificateRevocationListPath: String? = nil
) -> BrokerVerification {
return .init(
_internal: .verify(
trustRoots: trustRoots,
certificateRevocationListPath: certificateRevocationListPath
)
)
)
}
}

/// Configuration for the TLS verification of the client.
/// Default: `nil`
public var clientIdentity: ClientIdentity? = nil

/// Configuration for the TLS verification of the broker.
/// Default: `verify(trustRoots: .probe, certificateRevocationListPath: nil)``
public var brokerVerification: BrokerVerification = .verify(
trustRoots: .probe,
certificateRevocationListPath: nil
)

public init() {}

// MARK: TLSConfiguration + Dictionary

internal var dictionary: [String: String] {
var resultDict: [String: String] = [:]

switch self._internal {
case .keyPair(let privateKey, let publicKeyCertificate, let caCertificate, let crlLocation):
// Client TLS Verification
switch self.clientIdentity?._internal {
case .none:
break
case .keyPair(let privateKey, let certificate):
switch privateKey.key._internal {
case .file(location: let location):
resultDict["ssl.key.location"] = location
case .pem(let pem):
resultDict["ssl.key.pem"] = pem
}
resultDict["ssl.key.password"] = privateKey.password
switch publicKeyCertificate._internal {
switch certificate._internal {
case .file(location: let location):
resultDict["ssl.key.location"] = location
resultDict["ssl.certificate.location"] = location
case .pem(let pem):
resultDict["ssl.certificate.pem"] = pem
}
switch caCertificate._internal {
case .disableBrokerVerification:
resultDict["enable.ssl.certificate.verification"] = String(false)
case .probe:
resultDict["ssl.ca.location"] = "probe"
case .file(location: let location):
resultDict["ssl.ca.location"] = location
case .pem(let pem):
resultDict["ssl.ca.pem"] = pem
}
resultDict["ssl.crl.location"] = crlLocation
case .keyStore(let keyStore, let caCertificate, let crlLocation):
case .keyStore(let keyStore):
resultDict["ssl.keystore.location"] = keyStore.location
resultDict["ssl.keystore.password"] = keyStore.password
switch caCertificate._internal {
case .disableBrokerVerification:
resultDict["enable.ssl.certificate.verification"] = String(false)
}

// Broker TLS Verification
switch self.brokerVerification._internal {
case .disabled:
resultDict["enable.ssl.certificate.verification"] = String(false)
case .verify(let trustRoots, let certificateRevocationListPath):
resultDict["enable.ssl.certificate.verification"] = String(true)
switch trustRoots._internal {
case .probe:
resultDict["ssl.ca.location"] = "probe"
case .file(location: let location):
resultDict["ssl.ca.location"] = location
case .pem(let pem):
resultDict["ssl.ca.pem"] = pem
}
resultDict["ssl.crl.location"] = crlLocation
resultDict["ssl.crl.location"] = certificateRevocationListPath
}

return resultDict
Expand Down Expand Up @@ -277,7 +295,7 @@ extension KafkaConfiguration {
}

/// Disable automatic key refresh by setting this property.
public static let disable: KeyRefreshAttempts = .init(rawValue: 0)
public static let disabled: KeyRefreshAttempts = .init(rawValue: 0)
}

/// Minimum time in between key refresh attempts.
Expand Down Expand Up @@ -311,7 +329,6 @@ extension KafkaConfiguration {
/// Default OAuthBearer method.
///
/// - Parameters:
///
/// - configuration: SASL/OAUTHBEARER configuration.
/// The format is implementation-dependent and must be parsed accordingly.
/// The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds.
Expand All @@ -327,7 +344,6 @@ extension KafkaConfiguration {
/// OpenID Connect (OIDC).
///
/// - Parameters:
///
/// - configuration: SASL/OAUTHBEARER configuration.
/// The format is implementation-dependent and must be parsed accordingly.
/// The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds.
Expand Down Expand Up @@ -481,7 +497,7 @@ extension KafkaConfiguration {
)

/// Use the Transport Layer Security (TLS) protocol.
public static func tls(configuration: TLSConfiguration) -> SecurityProtocol {
public static func tls(configuration: TLSConfiguration = TLSConfiguration()) -> SecurityProtocol {
return SecurityProtocol(
_internal: .tls(configuration: configuration)
)
Expand All @@ -497,7 +513,7 @@ extension KafkaConfiguration {
/// Use the Simple Authentication and Security Layer (SASL) with TLS.
public static func saslTLS(
saslMechanism: SASLMechanism,
tlsConfiguaration: TLSConfiguration
tlsConfiguaration: TLSConfiguration = TLSConfiguration()
) -> SecurityProtocol {
return SecurityProtocol(
_internal: .saslTLS(saslMechanism: saslMechanism, tlsConfiguaration: tlsConfiguaration)
Expand Down
6 changes: 3 additions & 3 deletions Sources/Kafka/Configuration/KafkaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public enum KafkaConfiguration {
}

/// Disable the intervalled refresh (not recommended).
public static let disable: RefreshInterval = .init(rawValue: -1)
public static let disabled: RefreshInterval = .init(rawValue: -1)
}

/// Period of time at which topic and broker metadata is refreshed to proactively discover any new brokers, topics, partitions or partition leader changes.
Expand Down Expand Up @@ -165,7 +165,7 @@ public enum KafkaConfiguration {
}

/// Disable disconnecting from the broker on a number of send failures.
public static let disable: MaximumFailures = .init(rawValue: 0)
public static let disabled: MaximumFailures = .init(rawValue: 0)
}

/// Disconnect from the broker when this number of send failures (e.g., timed-out requests) is reached.
Expand Down Expand Up @@ -224,7 +224,7 @@ public enum KafkaConfiguration {
}

/// Disable the backoff and reconnect immediately.
public static let disable: Backoff = .init(rawValue: 0)
public static let disabled: Backoff = .init(rawValue: 0)
}

/// The initial time to wait before reconnecting to a broker after the connection has been closed.
Expand Down

0 comments on commit a8eb315

Please sign in to comment.