Requirements for Kafka

From HVR
Jump to: navigation, search

This section describes the requirements, access privileges, and other features of HVR when using Kafka for replication. For information about the capabilities supported by HVR on Kafka, see Capabilities for Kafka.

For information about compatibility and supported versions of Kafka with HVR platforms, see Platform Compatibility Matrix.

To quickly setup replication into Kafka, see Quick Start for HVR into Kafka.

Capture Hub Integrate
Icon-No.png Icon-No.png Icon-Yes.png

Location Connection

This section lists and describes the connection details required for creating Kafka location in HVR.

SC-Hvr-Location Kafka AVRO.png
Field Description
Broker The hostname or IP address of the Kafka broker server.
Port The TCP port that the Kafka server uses to listen for client connections. The default port is 9092.
  Example: 9092
Note: HVR supports connecting to more than one Kafka broker servers. Click ➕ to add more Kafka brokers.
Mechanism The authentication mode for connecting HVR to Kafka server (Broker). Available options:
  • No Authentication (default)
  • User Name and Password
  • Kerberos
On Linux, to use User Name and Password or Kerberos, HVR requires the library to be installed. For more information, see Installation Dependencies
User The username to connect HVR to Kafka server. This field is enabled only if Mechanism is User Name and Password.
Password The password of the User to connect HVR to Kafka server. This field is enabled only if Mechanism is User Name and Password.
Service Name The Kerberos Service Principal Name (SPN) of the Kafka server. This field is enabled only if Mechanism is Kerberos.
Client Principal The full Kerberos principal of the client connecting to the Kafka server. This is required only on Linux.
This field is enabled only if Mechanism is Kerberos.
Client Key Tab The directory path where the Kerberos keytab file containing key for Client Principal is located. This field is enabled only if Mechanism is Kerberos.
Default Topic The Kafka topic to which the messages are written. This can be overridden (e.g. on a specific table) by defining Integrate /Topic.

A topic name can be selected from the Broker by clicking the [...] (browse) button. Alternatively, a value can be specified containing substitutions {hvr_tbl_name} and {hvr_schema}. The default value is {hvr_tbl_name}.
  Example: {hvr_tbl_name}_avro

Schema Registry (Avro) The URL or IP address of the schema registry to use Confluent compatible messages in Avro format. For more information, see Kafka Message Format.
SSL Options Displays the SSL options.

SSL Options

SC-Hvr-Location Hive SSL.png
Field Description
Enable SSL Enable/disable (one way) SSL. If enabled, HVR authenticates the Kafka server by validating the SSL certificate shared by the Kafka server.
Two-way SSL Enable/disable two way SSL. If enabled, both HVR and Kafka server authenticate each other by validating each others SSL certificate. This field is enabled only if Enable SSL is selected.
SSL Public Certificate The directory path where the .pem file containing the client's SSL public certificate is located. This field is enabled only if Two-way SSL is selected.
SSL Private Key The directory path where the .pem file containing the client's SSL private key is located. This field is enabled only if Two-way SSL is selected.
Client Private Key Password The password of the private key file that is specified in SSL Private Key. This field is enabled only if Two-way SSL is selected.

Installation Dependencies

On Linux, to use the Kafka authentication Mechanism(User Name and Password or Kerberos), HVR requires the library to be installed. This library is part of Cyrus SASL and can be installed as follows:

$ yum install cyrus-sasl		# On Red Hat Linux, CentOS
$ zypper install cyrus-sasl		# On SUSE Linux

Kafka Message Format

HVR's Kafka location sends messages in JSON format by default, unless the location option Schema Registry (Avro) is used, in which case each message uses Kafka Connect's compact AVRO-based format. Note that this is not a true AVRO because each message would not be a valid AVRO file (e.g. no file header). Rather, each message is a 'micro AVRO', containing fragments of data encoded using AVRO's data type serialization format. Both JSON (using mode SCHEMA_PAYLOAD, see FileFormat /JsonMode) and the 'micro AVRO' format conform to Confluent's 'Kafka Connect' message format standard. The Confluent Schema Registry AVRO-based mode cannot be used with action FileFormat parameters such as /Json /Xml /Csv /AvroCompression or /Parquet.

Action FileFormat parameters /Xml, /Csv , /Avro or /Parquet can be used to send messages in other formats. If parameter /Avro is chosen without enabling location option Schema Registry (Avro) then each message would be a valid AVRO file (including a header with the schema and column information), rather than Kafka Connect's more compact AVRO-based format.

The Kafka messages should also contain special columns containing the operation type (delete, insert and update) and the sequence number. For achieving this, define action ColumnProperties for the Kafka location as mentioned below:

Group Table Action
KAFKA * ColumnProperties /Name=op_val /Extra /Datatype=integer /IntegrateExpression="{hvr_op}"
KAFKA * ColumnProperties /Name=integ_seq /Extra /Datatype=varchar /Length=36 /IntegrateExpression="{hvr_integ_seq}" /TimeKey

Metadata for Messages

To process HVR's messages, a Kafka consumer will often need metadata (table and column names, data types etc) about the replicated table. If the location is defined with option Schema Registry (Avro) then it can read this from that registry. For JSON format with the default mode (SCHEMA_PAYLOAD), each message contains this information. Another way to include metadata to each message is to add actions ColumnProperties /Extra /IntegrateExpression to add values like {hvr_tbl_name} and {hvr_op} .

Kafka Message Bundling and Size

By default each Kafka message contains just one row, regardless of the format chosen. Multiple rows can be bundled into a single message using Integrate /MessageBundling with values CHANGE(update message contains both 'before' and 'after' rows, inserts and deletes just contain one row) TRANSACTION(message contains all rows associated with a captured transaction) or THRESHOLD(message is filled with rows until it reaches limit). Bundled messages simply consist of the contents of several single-row messages concatenated together.

Although bundling of multiple rows can be combined with the Kafka Connect compatible formats (JSON with default mode SCHEMA_PAYLOAD and Schema Registry AVRO-based format), the resulting (longer) messages no longer conform to Confluent's 'Kafka Connect' standard.

For bundling modes TRANSACTIONand THRESHOLD, the number of rows in each message is affected by parameter MessageBundlingThreshold(default is 800,000). For those bundling modes, rows continue to be bundled into the same message until after this threshold is exceeded. After that happens the message is sent and new rows are bundled into the next message.Parameter MessageBundlingThreshold has no effect for bundling modes ROWor CHANGE.

By default the maximum size of a Kafka message is 1,000,000 bytes; HVR will not send a message exceeding this size and will instead give a fatal error. You can change the maximum Kafka message size that HVR will send by defining $HVR_KAFKA_MSG_MAX_BYTES but ensure not to exceed the maximum message size configured in Kafka broker (settings message.max.bytes). If the message size exceeds this limit then message will be lost.

If the message is too big to be sent because they contain multiple rows, then less bundling (e.g. /MessageBundling=ROW) or using a lower MessageBundlingThreshold can help by reducing the number of rows in each message. Otherwise the number of bytes used for each row must be lowered; either with a more compact message format, or even by actually truncating a column value (by adding action ColumnProperties /TrimDatatype to the capture location).

Syncing Kafka, Interruption of Message Sending, and Consuming Messages with Idempotence

An HVR integrate job performs a sync of messages sent into Kafka at the end of each integrate cycle, instead of after each individual message. This means if the job is interrupted while it is sending messages, and when it is restarted, the sending of multiple rows from the interrupted cycle may be repeated. Programs consuming Kafka messages must be able to cope with this repetition; this is called being 'idempotent'. One technique to be idempotent is to track an increasing sequence in each message and use detect which messages have already been processed. A column with such an increasing sequence can be defined using action ColumnProperties /Name=integ_key /Extra /Datatype=varchar /Length=32 /IntegrateExpression="{hvr_integ_seq"}. If HVR resends a message, its contents will be identical each time, including this sequence number.

Kafka Message Keys and Partitioning

Kafka messages can contain a 'key' which Kafka uses to put messages into partitions, so consuming can be parallelized. HVR typically puts a key into each message which contains a hash computed from values in the 'distribution key' column of each row. This key is present only if the messages are in JSON or AVRO format. It is not present when the message bundling (/MessageBundling) mode is TRANSACTION or THRESHOLD.

Known Issue

When using Kafka broker version or, an existing bug (KAFKA-3547) in Kafka causes timeout error in HVR.
To resolve this issue, define action Environment for the Kafka location as mentioned below:

Group Table Action
KAFKA * Environment /Name=HVR_KAFKA_BROKER_VERSION /Value=

Note that if the Kafka broker version used is then /Value=