elasticsearch cassandra river

srecon / elasticsearch – cassandra – river

I have tried yesterday to make an integration between elasticsearch 2.1 and cassandra 2.1 with srecon/elasticsearch-cassandra-river .

after integration I have foundout that everytime that elasticsearch-cassandra-river job starts to loading data from cassandra and importing it to elasticsearch to be indexed, makes a duplication for every record.

for examle: I have a simple table.

CREATE TABLE IF NOT EXISTS nortpole.users(
user_id text PRIMARY KEY,
user_name text
);

INSERT INTO nortpole.users(user_id,user_name) VALUES(‘1′,’Bob’);
INSERT INTO nortpole.users(user_id,user_name) VALUES(‘2′,’alice’);

I create a index for river.

curl -XPUT ‘http://localhost:9200/_river/cassandra-river/_meta’ -d ‘{
“type” : “cassandra”,
“cassandra” : {
“cluster_name” : “Test Cluster”,
“keyspace” : “nortpole”,
“column_family” : “users”,
“batch_size” : 20000,
“hosts” : “localhost”,
“dcName” : “DC”,
“cron”  : “0/60 * * * * ?”
},
“index” : {
“index” : “prodinfo”,
“type” : “product”
}
}’

in this init command, it is set that cron job is executed every minute. after two minutes you have two result for following query.

curl -XPOST “http://localhost:9200/prodinfo/product/_search” -d’
{
“query”: {
“query_string”: {
“query”: “Bob”
}
}
}’

but what does cause this problem?

according to source CassandraRiver.java in

https://github.com/srecon/elasticsearch-cassandra-river/blob/master/src/main/java/com/blu/es/cassandra/CassandraRiver.java

every record from cassandra with a new id is sent to elastic search.


public class CassandraRiver extends AbstractRiverComponent implements River {
….

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

String rowId = UUID.randomUUID().toString();

values.put(rowId, rows);

threadExecutor.execute(new Indexer(getBatchSize(),values, getTypeName(), getIndexName()));

public Indexer(int batchSize, Map<String, Map<String, String>> keys, String typeName, String indexName) {
this.batchSize = batchSize;
this.keys = keys;

public void run() {

for(String key : this.keys.keySet()){
try{
bulk.add(Requests.indexRequest(this.indexName).type(this.typeName)
.id(key).source(this.keys.get(key)));
} catch(Exception e){
LOGGER.error(“Exception in run {}”, e);
}
}

also following code shows that every start for indexing, collects all data from a table. in a huge table case, it causes performance problem.

String SQL = “select * from ” + getColumnFamily() +” ALLOW FILTERING;”;

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s