学习电脑,计算机系统故障维护,电脑技术,电脑知识学习-就上第二电脑网
当前位置: 首页 > 电脑知识 > 电脑基础

电脑无损播放器kafka, zeromq demo

 更新时间: 2019-08-06 08:27:10   作者:第二电脑网   来源:第二电脑网   浏览数:239   我要评论

kafka: 修改bi/widows/kafka-seve-stat.bat set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M<b> seve.popeties pot=9092<b> 启动zk zookeepe-seve-stat.bat ../../

kafka:

修改bin/windows/kafka-server-start.bat set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M<br>

server.properties port=9092<br>

启动zk zookeeper-server-start.bat ../../config/zookeeper.properties<br>

启动server kafka-server-start.bat ../../config/server.properties<br>

www.002pc.com认为此文章对《电脑无损播放器kafka, zeromq demo》说的很在理。

 

 

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.0</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>

 

 

Producer:

 

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "127.0.0.1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks","-1");

        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));

        int messageNo = 100;
        final int COUNT = 1000;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            producer.send(new KeyedMessage<String, String>("TestTopic", key ,data));
            System.out.println(data);
            messageNo ++;
        }
    }  
}

 Consumer:

 

 

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:2181");
        //group 代表一个消费组
        props.put("group.id", "jd-group");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("TestTopic", 1);

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get("TestTopic").get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println(it.next().message());
        }
        System.out.println("finished");
    }  
}

 

 

zeromq:

 

<dependency>
            <groupId>org.zeromq</groupId>
            <artifactId>jeromq</artifactId>
            <version>0.4.2</version>
        </dependency>

更多:电脑无损播放器kafka, zeromq demo
https://www.002pc.com/diannaojichu/691.html

你可能感兴趣的demo,zeromq,kafka

Tags:demo zeromq kafka

关于我们 - 广告合作 - 联系我们 - 免责声明 - 网站地图 - 投诉建议 - 在线投稿

  浙ICP备140365454号

©CopyRight 2008-2020 002pc.COM Inc All Rights Reserved. 第二电脑网 版权所有 联系QQ:282523118