Product Promotion
0x5a.live
for different kinds of informations and explorations.
GitHub - sashman/elasticsearch_elixir_bulk_processor: Elasticsearch Elixir Bulk Processor is a configurable manager for efficiently inserting data into Elasticsearch. This processor uses GenStages (data-exchange steps) for handling backpressure, and various settings to control the bulk payloads being uploaded to Elasticsearch.
Elasticsearch Elixir Bulk Processor is a configurable manager for efficiently inserting data into Elasticsearch. This processor uses GenStages (data-exchange steps) for handling backpressure, and...
Visit SiteGitHub - sashman/elasticsearch_elixir_bulk_processor: Elasticsearch Elixir Bulk Processor is a configurable manager for efficiently inserting data into Elasticsearch. This processor uses GenStages (data-exchange steps) for handling backpressure, and various settings to control the bulk payloads being uploaded to Elasticsearch.
Elasticsearch Elixir Bulk Processor is a configurable manager for efficiently inserting data into Elasticsearch. This processor uses GenStages (data-exchange steps) for handling backpressure, and...
Powered by 0x5a.live ๐
ElasticsearchElixirBulkProcessor
Elasticsearch Elixir Bulk Processor is a configurable manager for efficiently inserting data into Elasticsearch. This processor uses GenStages for handling backpressure, and various settings to control the bulk payloads being uploaded to Elasticsearch.
Inspired by the Java Bulk Processor. Uses elasticsearch-elixir as the client. Featured on the Elastic Community Contributed Clients page.
Installation
If available in Hex, the package can be installed
by adding elasticsearch_elixir_bulk_processor
to your list of dependencies in mix.exs
:
def deps do
[
{:elasticsearch_elixir_bulk_processor, "~> 0.1"}
]
end
Sending data
ElasticsearchElixirBulkProcessor.send_requests(list_of_items)
To send a list of request items to Elasticsearch. This mechanism uses GenStages for back pressure. NOTE: It should be completely reasonable to use this function by passing single element lists, the mechanism aggregates the items together prior to sending them.
If you wish to bypass the GenStage mechanism and send the data synchronously you can use:
ElasticsearchElixirBulkProcessor.Bulk.DirectUpload.add_requests(list_of_items)
The list elements must be structs:
ElasticsearchElixirBulkProcessor.Items.Index
ElasticsearchElixirBulkProcessor.Items.Create
ElasticsearchElixirBulkProcessor.Items.Update
ElasticsearchElixirBulkProcessor.Items.Delete
Examples
iex> alias ElasticsearchElixirBulkProcessor.Items.Index
...> [
...> %Index{index: "test_index", source: %{"field" => "value1"}},
...> %Index{index: "test_index", source: %{"field" => "value2"}},
...> %Index{index: "test_index", source: %{"field" => "value3"}}
...> ]
...> |> ElasticsearchElixirBulkProcessor.send_requests()
:ok
Configuration
Elasticsearch endpoint
Can be configurate via the ELASTICSEARCH_URL
environment variable, defaults to: "http://localhost:9200"
.
config :elasticsearch_elixir_bulk_processor,
ElasticsearchElixirBulkProcessor.ElasticsearchCluster,
url: {:system, "ELASTICSEARCH_URL"},
api: Elasticsearch.API.HTTP
Alternatively:
config :elasticsearch_elixir_bulk_processor,
ElasticsearchElixirBulkProcessor.ElasticsearchCluster,
url: "http://localhost:9200",
api: Elasticsearch.API.HTTP
See the client configuration for more.
Action count
Number of actions/items to send per bulk (can be changed at run time), deault is nil
(unlimited):
ElasticsearchElixirBulkProcessor.set_event_count_threshold(100)
Byte size
Max number of bytes to send per bulk (can be changed at run time), default is 62_914_560
(60mb):
ElasticsearchElixirBulkProcessor.set_byte_threshold(100)
Action order
Preservation of order of actions/items
config :elasticsearch_elixir_bulk_processor, preserve_event_order: false
Retries
Retry policy, this uses the ElixirRetry DSL. See ElasticsearchElixirBulkProcessor.Bulk.Retry.policy
.
config :elasticsearch_elixir_bulk_processor, retry_function: &MyApp.Retry.policy/0
Default:
def default do
constant_backoff(100) |> Stream.take(5)
end
Success and error handlers
The callbacks on a successful upload or in case of failed items or failed request can bet set through the config.
On success, the handler is called with the Elasticsearch bulk request. On failure, the hanlder is called with%{data: any, error: any}
, data
being the original payload and error
being the response or HTTP error.
See ElasticsearchElixirBulkProcessor.Bulk.Handlers
.
config :elasticsearch_elixir_bulk_processor,
success_function: &MyApp.success_handler/1,
error_function: &MyApp.error_handler/1
It's highly recommended you add an error handler to make sure your data is uploaded succesfully, for example you can use the logger:
require Logger
...
def error_handler(%{data: _, error: {_, error}}) do
error
|> inspect
|> Logger.error()
end
Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/elasticsearch_elixir_bulk_processor.
Testing script
The testing script is used to compare insertion using direct upload vs using a GenStage based approach. Run Elasticsearch set up with:
docker-compose up
Run test:
mix insert_test <INSERT_COUNT> <BULK_SIZE> staged|direct
staged
- uses a GenStage mechanism to aggregate and insert.direct
- iterates and inserts bulk sequentially as given.
Elixir Resources
are all listed below.
Made with โค๏ธ
to provide different kinds of informations and resources.