import org.zeromq.ZMQ;
public class Publisher {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1); // 创建包含一个I/O线程的context
ZMQ.Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5555");
while (!Thread.currentThread ().isInterrupted()) {
String message = "toutiao hello";
publisher.send(message.getBytes());
System.out.println("sent : " + message);
}
publisher.close();
context.term();
}
}
import org.zeromq.ZMQ;
public class Subscriber {
public static void main(String args[]) {
for (int j = 0; j < 10; j++) {
new Thread(new Runnable(){
public void run() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://127.0.0.1:5555");
subscriber.subscribe("toutiao".getBytes());
try {
while (true) {
byte[] message = subscriber.recv();
System.out.println(Thread.currentThread().getName() + " receive : " + new String(message));
}
} finally {
subscriber.close();
context.term();
}
}
}).start();
}
}
}
更多:电脑无损播放器kafka, zeromq demo
https://www.002pc.com/diannaojichu/691.html
你可能感兴趣的demo,zeromq,kafka
